From 2788281062205a0bc9904b31aeae4ef30f7e7643 Mon Sep 17 00:00:00 2001 From: Vlad Dobromyslov Date: Thu, 23 Feb 2023 17:55:49 +0200 Subject: [PATCH] #501 - concurrent_unordered_set for connection --- libraries/net/node.cpp | 824 ++++++++++++++++++++++++++--------------- 1 file changed, 526 insertions(+), 298 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 85e8c676..c3198d1d 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -128,6 +128,124 @@ namespace graphene { namespace net { namespace detail { namespace bmi = boost::multi_index; + + /******* + * A class to wrap std::unordered_set for multithreading + */ + template , class Pred = std::equal_to > + class concurrent_unordered_set : private std::unordered_set + { + private: + mutable fc::mutex mux; + + public: + /// Iterations require a lock. This exposes the mutex. Use with care (i.e. lock_guard) + fc::mutex& get_mutex()const { return mux; } + + /// Insertion + /// @{ + std::pair< typename std::unordered_set::iterator, bool> emplace( Key key) + { + fc::scoped_lock lock(mux); + return std::unordered_set::emplace( key ); + } + std::pair< typename std::unordered_set::iterator, bool> insert (const Key& val) + { + fc::scoped_lock lock(mux); + return std::unordered_set::insert( val ); + } + /// @} + /// Size + /// @{ + size_t size() const + { + fc::scoped_lock lock(mux); + return std::unordered_set::size(); + } + bool empty() const noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::empty(); + } + /// @} + /// Removal + /// @{ + void clear() noexcept + { + fc::scoped_lock lock(mux); + std::unordered_set::clear(); + } + typename std::unordered_set::iterator erase( + typename std::unordered_set::const_iterator itr) + { + fc::scoped_lock lock(mux); + return std::unordered_set::erase( itr); + } + size_t erase( const Key& key) + { + fc::scoped_lock lock(mux); + return std::unordered_set::erase( key ); + } + /// @} + /// Swap + /// @{ + void swap( typename std::unordered_set& other ) noexcept + { + fc::scoped_lock lock(mux); + std::unordered_set::swap( other ); + } + /// @} + /// Iteration + /// @{ + typename std::unordered_set::iterator begin() noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::begin(); + } + typename std::unordered_set::const_iterator begin() const noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::begin(); + } + typename std::unordered_set::local_iterator begin(size_t n) + { + fc::scoped_lock lock(mux); + return std::unordered_set::begin(n); + } + typename std::unordered_set::const_local_iterator begin(size_t n) const + { + fc::scoped_lock lock(mux); + return std::unordered_set::begin(n); + } + typename std::unordered_set::iterator end() noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::end(); + } + typename std::unordered_set::const_iterator end() const noexcept + { + fc::scoped_lock lock(mux); + return std::unordered_set::end(); + } + typename std::unordered_set::local_iterator end(size_t n) + { + fc::scoped_lock lock(mux); + return std::unordered_set::end(n); + } + typename std::unordered_set::const_local_iterator end(size_t n) const + { + fc::scoped_lock lock(mux); + return std::unordered_set::end(n); + } + /// @} + /// Search + typename std::unordered_set::const_iterator find(Key key) + { + fc::scoped_lock lock(mux); + return std::unordered_set::find(key); + } + }; + class blockchain_tied_message_cache { private: @@ -481,9 +599,9 @@ namespace graphene { namespace net { namespace detail { /// used by the task that advertises inventory during normal operation // @{ - fc::promise::ptr _retrigger_advertise_inventory_loop_promise; - fc::future _advertise_inventory_loop_done; - std::unordered_set _new_inventory; /// list of items we have received but not yet advertised to our peers + fc::promise::ptr _retrigger_advertise_inventory_loop_promise; + fc::future _advertise_inventory_loop_done; + concurrent_unordered_set _new_inventory; /// list of items we have received but not yet advertised to our peers // @} fc::future _terminate_inactive_connections_loop_done; @@ -519,13 +637,13 @@ namespace graphene { namespace net { namespace detail { /** Stores all connections which have not yet finished key exchange or are still sending initial handshaking messages * back and forth (not yet ready to initiate syncing) */ - std::unordered_set _handshaking_connections; + concurrent_unordered_set _handshaking_connections; /** stores fully established connections we're either syncing with or in normal operation with */ - std::unordered_set _active_connections; + concurrent_unordered_set _active_connections; /** stores connections we've closed (sent closing message, not actually closed), but are still waiting for the remote end to close before we delete them */ - std::unordered_set _closing_connections; + concurrent_unordered_set _closing_connections; /** stores connections we've closed, but are still waiting for the OS to notify us that the socket is really closed */ - std::unordered_set _terminating_connections; + concurrent_unordered_set _terminating_connections; boost::circular_buffer _most_recent_blocks_accepted; // the /n/ most recent blocks we've accepted (currently tuned to the max number of connections) @@ -854,16 +972,19 @@ namespace graphene { namespace net { namespace detail { ilog( "cleaning up node" ); _node_is_shutting_down.store(true); - for (const peer_connection_ptr& active_peer : _active_connections) { - fc::optional inbound_endpoint = active_peer->get_endpoint_for_connecting(); - if (inbound_endpoint) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_peer : _active_connections) { - fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint); - if (updated_peer_record) + fc::optional inbound_endpoint = active_peer->get_endpoint_for_connecting(); + if (inbound_endpoint) { - updated_peer_record->last_seen_time = fc::time_point::now(); - _potential_peer_db.update_entry(*updated_peer_record); + fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint); + if (updated_peer_record) + { + updated_peer_record->last_seen_time = fc::time_point::now(); + _potential_peer_db.update_entry(*updated_peer_record); + } } } } @@ -1061,6 +1182,7 @@ namespace graphene { namespace net { namespace detail { std::set sync_items_to_request; // for each idle peer that we're syncing with + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { if( peer->we_need_sync_items_from_peer && @@ -1119,6 +1241,7 @@ namespace graphene { namespace net { namespace detail { bool node_impl::is_item_in_any_peers_inventory(const item_id& item) const { + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() ) @@ -1158,9 +1281,13 @@ namespace graphene { namespace net { namespace detail { fetch_messages_to_send_set items_by_peer; // initialize the fetch_messages_to_send with an empty set of items for all idle peers - for (const peer_connection_ptr& peer : _active_connections) - if (peer->idle()) - items_by_peer.insert(peer_and_items_to_fetch(peer)); + { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { + if (peer->idle()) + items_by_peer.insert(peer_and_items_to_fetch(peer)); + } + } // now loop over all items we want to fetch for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();) @@ -1262,56 +1389,59 @@ namespace graphene { namespace net { namespace detail { dlog("beginning an iteration of advertise inventory"); // swap inventory into local variable, clearing the node's copy std::unordered_set inventory_to_advertise; - inventory_to_advertise.swap(_new_inventory); + _new_inventory.swap(inventory_to_advertise); // process all inventory to advertise and construct the inventory messages we'll send // first, then send them all in a batch (to avoid any fiber interruption points while // we're computing the messages) std::list > inventory_messages_to_send; - for (const peer_connection_ptr& peer : _active_connections) { - // only advertise to peers who are in sync with us - idump((peer->peer_needs_sync_items_from_us)); - if( !peer->peer_needs_sync_items_from_us ) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - std::map > items_to_advertise_by_type; - // don't send the peer anything we've already advertised to it - // or anything it has advertised to us - // group the items we need to send by type, because we'll need to send one inventory message per type - unsigned total_items_to_send_to_this_peer = 0; - idump((inventory_to_advertise)); - for (const item_id& item_to_advertise : inventory_to_advertise) + // only advertise to peers who are in sync with us + idump((peer->peer_needs_sync_items_from_us)); + if( !peer->peer_needs_sync_items_from_us ) { - auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise); - auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise); + std::map > items_to_advertise_by_type; + // don't send the peer anything we've already advertised to it + // or anything it has advertised to us + // group the items we need to send by type, because we'll need to send one inventory message per type + unsigned total_items_to_send_to_this_peer = 0; + idump((inventory_to_advertise)); + for (const item_id& item_to_advertise : inventory_to_advertise) + { + auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise); + auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise); - if (adv_to_peer == peer->inventory_advertised_to_peer.end() && - adv_to_us == peer->inventory_peer_advertised_to_us.end()) - { - items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash); - peer->inventory_advertised_to_peer.insert(peer_connection::timestamped_item_id(item_to_advertise, fc::time_point::now())); - ++total_items_to_send_to_this_peer; - if (item_to_advertise.item_type == trx_message_type) - testnetlog("advertising transaction ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint())); - dlog("advertising item ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint())); - } - else - { - if (adv_to_peer != peer->inventory_advertised_to_peer.end() ) + if (adv_to_peer == peer->inventory_advertised_to_peer.end() && + adv_to_us == peer->inventory_peer_advertised_to_us.end()) + { + items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash); + peer->inventory_advertised_to_peer.insert(peer_connection::timestamped_item_id(item_to_advertise, fc::time_point::now())); + ++total_items_to_send_to_this_peer; + if (item_to_advertise.item_type == trx_message_type) + testnetlog("advertising transaction ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint())); + dlog("advertising item ${id} to peer ${endpoint}", ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint())); + } + else + { + if (adv_to_peer != peer->inventory_advertised_to_peer.end() ) idump( (*adv_to_peer) ); - if (adv_to_us != peer->inventory_peer_advertised_to_us.end() ) + if (adv_to_us != peer->inventory_peer_advertised_to_us.end() ) idump( (*adv_to_us) ); + } } - } dlog("advertising ${count} new item(s) of ${types} type(s) to peer ${endpoint}", ("count", total_items_to_send_to_this_peer) - ("types", items_to_advertise_by_type.size()) - ("endpoint", peer->get_remote_endpoint())); - for (auto items_group : items_to_advertise_by_type) - inventory_messages_to_send.push_back(std::make_pair(peer, item_ids_inventory_message(items_group.first, items_group.second))); + ("types", items_to_advertise_by_type.size()) + ("endpoint", peer->get_remote_endpoint())); + for (auto items_group : items_to_advertise_by_type) + inventory_messages_to_send.push_back(std::make_pair(peer, item_ids_inventory_message(items_group.first, items_group.second))); + } + peer->clear_old_inventory(); } - peer->clear_old_inventory(); } for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter) @@ -1360,25 +1490,30 @@ namespace graphene { namespace net { namespace detail { uint32_t handshaking_timeout = _peer_inactivity_timeout; fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout); - for( const peer_connection_ptr handshaking_peer : _handshaking_connections ) - if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold && - handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && - handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold ) + { + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr handshaking_peer : _handshaking_connections ) { - wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds", - ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) ); - wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", - ("status", handshaking_peer->negotiation_status) - ("sent", handshaking_peer->get_total_bytes_sent()) - ("received", handshaking_peer->get_total_bytes_received())); - handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn, "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", - ("peer", handshaking_peer->get_remote_endpoint()) - ("timeout", handshaking_timeout) - ("status", handshaking_peer->negotiation_status) - ("sent", handshaking_peer->get_total_bytes_sent()) - ("received", handshaking_peer->get_total_bytes_received()))); - peers_to_disconnect_forcibly.push_back( handshaking_peer ); + if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold && + handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && + handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold ) + { + wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds", + ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) ); + wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", + ("status", handshaking_peer->negotiation_status) + ("sent", handshaking_peer->get_total_bytes_sent()) + ("received", handshaking_peer->get_total_bytes_received())); + handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn, "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", + ("peer", handshaking_peer->get_remote_endpoint()) + ("timeout", handshaking_timeout) + ("status", handshaking_peer->negotiation_status) + ("sent", handshaking_peer->get_total_bytes_sent()) + ("received", handshaking_peer->get_total_bytes_received()))); + peers_to_disconnect_forcibly.push_back( handshaking_peer ); + } } + } // timeout for any active peers is two block intervals uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds; @@ -1398,94 +1533,103 @@ namespace graphene { namespace net { namespace detail { fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout); fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout); fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout; - for( const peer_connection_ptr& active_peer : _active_connections ) { - if( active_peer->connection_initiation_time < active_disconnect_threshold && - active_peer->get_last_message_received_time() < active_disconnect_threshold ) + fc::scoped_lock lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& active_peer : _active_connections ) { - wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds", - ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) ); - peers_to_disconnect_gently.push_back( active_peer ); - } - else - { - bool disconnect_due_to_request_timeout = false; - for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->sync_items_requested_from_peer) - if (item_and_time.second < active_ignored_request_threshold) - { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ${id}", - ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); - disconnect_due_to_request_timeout = true; - break; - } - if (!disconnect_due_to_request_timeout && - active_peer->item_ids_requested_from_peer && - active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold) - { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}", - ("peer", active_peer->get_remote_endpoint()) - ("synopsis", active_peer->item_ids_requested_from_peer->get<0>())); - disconnect_due_to_request_timeout = true; - } - if (!disconnect_due_to_request_timeout) - for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer) + if( active_peer->connection_initiation_time < active_disconnect_threshold && + active_peer->get_last_message_received_time() < active_disconnect_threshold ) + { + wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds", + ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) ); + peers_to_disconnect_gently.push_back( active_peer ); + } + else + { + bool disconnect_due_to_request_timeout = false; + for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->sync_items_requested_from_peer) if (item_and_time.second < active_ignored_request_threshold) { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}", - ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); + wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ${id}", + ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); disconnect_due_to_request_timeout = true; break; } - if (disconnect_due_to_request_timeout) - { - // we should probably disconnect nicely and give them a reason, but right now the logic - // for rescheduling the requests only executes when the connection is fully closed, - // and we want to get those requests rescheduled as soon as possible - peers_to_disconnect_forcibly.push_back(active_peer); - } - else if (active_peer->connection_initiation_time < active_send_keepalive_threshold && - active_peer->get_last_message_received_time() < active_send_keepalive_threshold) - { - wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", - ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); - peers_to_send_keep_alive.push_back(active_peer); - } - else if (active_peer->we_need_sync_items_from_peer && - !active_peer->is_currently_handling_message() && - !active_peer->item_ids_requested_from_peer && - active_peer->ids_of_items_to_get.empty()) - { - // This is a state we should never get into in the first place, but if we do, we should disconnect the peer - // to re-establish the connection. - fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", - ("peer", active_peer->get_remote_endpoint())); - wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", - ("peer", active_peer->get_remote_endpoint())); - peers_to_disconnect_forcibly.push_back(active_peer); + if (!disconnect_due_to_request_timeout && + active_peer->item_ids_requested_from_peer && + active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold) + { + wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}", + ("peer", active_peer->get_remote_endpoint()) + ("synopsis", active_peer->item_ids_requested_from_peer->get<0>())); + disconnect_due_to_request_timeout = true; + } + if (!disconnect_due_to_request_timeout) + for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer) + if (item_and_time.second < active_ignored_request_threshold) + { + wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}", + ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); + disconnect_due_to_request_timeout = true; + break; + } + if (disconnect_due_to_request_timeout) + { + // we should probably disconnect nicely and give them a reason, but right now the logic + // for rescheduling the requests only executes when the connection is fully closed, + // and we want to get those requests rescheduled as soon as possible + peers_to_disconnect_forcibly.push_back(active_peer); + } + else if (active_peer->connection_initiation_time < active_send_keepalive_threshold && + active_peer->get_last_message_received_time() < active_send_keepalive_threshold) + { + wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", + ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); + peers_to_send_keep_alive.push_back(active_peer); + } + else if (active_peer->we_need_sync_items_from_peer && + !active_peer->is_currently_handling_message() && + !active_peer->item_ids_requested_from_peer && + active_peer->ids_of_items_to_get.empty()) + { + // This is a state we should never get into in the first place, but if we do, we should disconnect the peer + // to re-establish the connection. + fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + peers_to_disconnect_forcibly.push_back(active_peer); + } } } } fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT); - for( const peer_connection_ptr& closing_peer : _closing_connections ) - if( closing_peer->connection_closed_time < closing_disconnect_threshold ) - { - // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT - // seconds ago, but they haven't done it yet. Terminate the connection now - wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner", - ( "peer", closing_peer->get_remote_endpoint() ) ); - peers_to_disconnect_forcibly.push_back( closing_peer ); + { + fc::scoped_lock lock(_closing_connections.get_mutex()); + for( const peer_connection_ptr& closing_peer : _closing_connections ) { + if (closing_peer->connection_closed_time < closing_disconnect_threshold) { + // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT + // seconds ago, but they haven't done it yet. Terminate the connection now + wlog("Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner", + ("peer", closing_peer->get_remote_endpoint())); + peers_to_disconnect_forcibly.push_back(closing_peer); + } } + } uint32_t failed_terminate_timeout_seconds = 120; fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds); - for (const peer_connection_ptr& peer : _terminating_connections ) - if (peer->get_connection_terminated_time() != fc::time_point::min() && - peer->get_connection_terminated_time() < failed_terminate_threshold) - { - wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint())); - peers_to_terminate.push_back(peer); + { + fc::scoped_lock lock(_terminating_connections.get_mutex()); + for (const peer_connection_ptr& peer : _terminating_connections ) { + if (peer->get_connection_terminated_time() != fc::time_point::min() && + peer->get_connection_terminated_time() < failed_terminate_threshold) { + wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint())); + peers_to_terminate.push_back(peer); + } } + } // That's the end of the sorting step; now all peers that require further processing are now in one of the // lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate @@ -1493,11 +1637,14 @@ namespace graphene { namespace net { namespace detail { // if we've decided to delete any peers, do it now; in its current implementation this doesn't yield, // and once we start yielding, we may find that we've moved that peer to another list (closed or active) // and that triggers assertions, maybe even errors - for (const peer_connection_ptr& peer : peers_to_terminate ) { - assert(_terminating_connections.find(peer) != _terminating_connections.end()); - _terminating_connections.erase(peer); - schedule_peer_for_deletion(peer); + fc::scoped_lock lock(_terminating_connections.get_mutex()); + for (const peer_connection_ptr& peer : peers_to_terminate ) + { + assert(_terminating_connections.find(peer) != _terminating_connections.end()); + _terminating_connections.erase(peer); + schedule_peer_for_deletion(peer); + } } peers_to_terminate.clear(); @@ -1516,6 +1663,7 @@ namespace graphene { namespace net { namespace detail { // disconnect reason, so it may yield) for( const peer_connection_ptr& peer : peers_to_disconnect_gently ) { + fc::scoped_lock lock(_active_connections.get_mutex()); fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity", ( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) ( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) @@ -1539,6 +1687,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); + fc::scoped_lock lock(_active_connections.get_mutex()); std::list original_active_peers(_active_connections.begin(), _active_connections.end()); for( const peer_connection_ptr& active_peer : original_active_peers ) { @@ -1710,12 +1859,19 @@ namespace graphene { namespace net { namespace detail { peer_connection_ptr node_impl::get_peer_by_node_id(const node_id_t& node_id) { - for (const peer_connection_ptr& active_peer : _active_connections) - if (node_id == active_peer->node_id) - return active_peer; - for (const peer_connection_ptr& handshaking_peer : _handshaking_connections) - if (node_id == handshaking_peer->node_id) - return handshaking_peer; + { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_peer : _active_connections) + if (node_id == active_peer->node_id) + return active_peer; + } + { + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr& handshaking_peer : _handshaking_connections) + if (node_id == handshaking_peer->node_id) + return handshaking_peer; + } + return peer_connection_ptr(); } @@ -1727,18 +1883,25 @@ namespace graphene { namespace net { namespace detail { dlog("is_already_connected_to_id returning true because the peer is us"); return true; } - for (const peer_connection_ptr active_peer : _active_connections) - if (node_id == active_peer->node_id) - { - dlog("is_already_connected_to_id returning true because the peer is already in our active list"); - return true; + { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr active_peer : _active_connections) { + if (node_id == active_peer->node_id) { + dlog("is_already_connected_to_id returning true because the peer is already in our active list"); + return true; + } } - for (const peer_connection_ptr handshaking_peer : _handshaking_connections) - if (node_id == handshaking_peer->node_id) - { - dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list"); - return true; + } + { + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr handshaking_peer : _handshaking_connections) { + if (node_id == handshaking_peer->node_id) { + dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list"); + return true; + } } + } + return false; } @@ -1770,19 +1933,25 @@ namespace graphene { namespace net { namespace detail { ("max", _maximum_number_of_connections)); dlog(" my id is ${id}", ("id", _node_id)); - for (const peer_connection_ptr& active_connection : _active_connections) { - dlog(" active: ${endpoint} with ${id} [${direction}]", - ("endpoint", active_connection->get_remote_endpoint()) - ("id", active_connection->node_id) - ("direction", active_connection->direction)); + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_connection : _active_connections) + { + dlog(" active: ${endpoint} with ${id} [${direction}]", + ("endpoint", active_connection->get_remote_endpoint()) + ("id", active_connection->node_id) + ("direction", active_connection->direction)); + } } - for (const peer_connection_ptr& handshaking_connection : _handshaking_connections) { - dlog(" handshaking: ${endpoint} with ${id} [${direction}]", - ("endpoint", handshaking_connection->get_remote_endpoint()) - ("id", handshaking_connection->node_id) - ("direction", handshaking_connection->direction)); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr& handshaking_connection : _handshaking_connections) + { + dlog(" handshaking: ${endpoint} with ${id} [${direction}]", + ("endpoint", handshaking_connection->get_remote_endpoint()) + ("id", handshaking_connection->node_id) + ("direction", handshaking_connection->direction)); + } } } @@ -2229,6 +2398,7 @@ namespace graphene { namespace net { namespace detail { if (!_peer_advertising_disabled) { reply.addresses.reserve(_active_connections.size()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& active_peer : _active_connections) { fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint()); @@ -2414,11 +2584,14 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); uint32_t max_number_of_unfetched_items = 0; - for( const peer_connection_ptr& peer : _active_connections ) { - uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids; - max_number_of_unfetched_items = std::max(max_number_of_unfetched_items, - this_peer_number_of_unfetched_items); + fc::scoped_lock lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& peer : _active_connections ) + { + uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids; + max_number_of_unfetched_items = std::max(max_number_of_unfetched_items, + this_peer_number_of_unfetched_items); + } } return max_number_of_unfetched_items; } @@ -2633,17 +2806,19 @@ namespace graphene { namespace net { namespace detail { originating_peer->ids_of_items_to_get.empty()) { bool is_first_item_for_other_peer = false; - for (const peer_connection_ptr& peer : _active_connections) - if (peer != originating_peer->shared_from_this() && - !peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - { - dlog("The item ${newitem} is the first item for peer ${peer}", - ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - ("peer", peer->get_remote_endpoint())); - is_first_item_for_other_peer = true; - break; + { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { + if (peer != originating_peer->shared_from_this() && + !peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) { + dlog("The item ${newitem} is the first item for peer ${peer}", + ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())("peer", peer->get_remote_endpoint())); + is_first_item_for_other_peer = true; + break; + } } + } dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); if (!is_first_item_for_other_peer) @@ -2933,15 +3108,18 @@ namespace graphene { namespace net { namespace detail { item_id advertised_item_id(item_ids_inventory_message_received.item_type, item_hash); bool we_advertised_this_item_to_a_peer = false; bool we_requested_this_item_from_a_peer = false; - for (const peer_connection_ptr peer : _active_connections) { - if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end()) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr peer : _active_connections) { - we_advertised_this_item_to_a_peer = true; - break; + if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end()) + { + we_advertised_this_item_to_a_peer = true; + break; + } + if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end()) + we_requested_this_item_from_a_peer = true; } - if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end()) - we_requested_this_item_from_a_peer = true; } // if we have already advertised it to a peer, we must have it, no need to do anything else @@ -3172,6 +3350,7 @@ namespace graphene { namespace net { namespace detail { }; bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -3254,6 +3433,7 @@ namespace graphene { namespace net { namespace detail { else { // invalid message received + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -3356,15 +3536,18 @@ namespace graphene { namespace net { namespace detail { // find out if this block is the next block on the active chain or one of the forks bool potential_first_block = false; - for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - if (!peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == received_block_iter->block_id) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - potential_first_block = true; - peer->ids_of_items_to_get.pop_front(); - peer->ids_of_items_being_processed.insert(received_block_iter->block_id); + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections + if (!peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == received_block_iter->block_id) + { + potential_first_block = true; + peer->ids_of_items_to_get.pop_front(); + peer->ids_of_items_being_processed.insert(received_block_iter->block_id); + } } } @@ -3392,6 +3575,7 @@ namespace graphene { namespace net { namespace detail { { dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); std::vector< peer_connection_ptr > peers_needing_next_batch; + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id); @@ -3527,55 +3711,62 @@ namespace graphene { namespace net { namespace detail { fc::time_point_sec block_time = block_message_to_process.block.timestamp; bool disconnect_this_peer = false; - for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - - auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id); - if (iter != peer->inventory_peer_advertised_to_us.end()) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - // this peer offered us the item. It will eventually expire from the peer's - // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes). - // For now, it will remain there, which will prevent us from offering the peer this - // block back when we rebroadcast the block below - peer->last_block_delegate_has_seen = block_message_to_process.block_id; - peer->last_block_time_delegate_has_seen = block_time; + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections + + auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id); + if (iter != peer->inventory_peer_advertised_to_us.end()) + { + // this peer offered us the item. It will eventually expire from the peer's + // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes). + // For now, it will remain there, which will prevent us from offering the peer this + // block back when we rebroadcast the block below + peer->last_block_delegate_has_seen = block_message_to_process.block_id; + peer->last_block_time_delegate_has_seen = block_time; + } + peer->clear_old_inventory(); } - peer->clear_old_inventory(); } + message_propagation_data propagation_data{message_receive_time, message_validated_time, originating_peer->node_id}; broadcast( block_message_to_process, propagation_data ); _message_cache.block_accepted(); - for (const peer_connection_ptr& peer : _active_connections) { - if (is_hard_fork_block(block_number) ) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - if (peer->last_known_fork_block_number != 0) + if (is_hard_fork_block(block_number) ) { - uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number); - if (next_fork_block_number != 0 && - next_fork_block_number <= block_number) + if (peer->last_known_fork_block_number != 0) + { + uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number); + if (next_fork_block_number != 0 && + next_fork_block_number <= block_number) + { + disconnect_this_peer = true; + } + } + } + + if(peer->last_known_hardfork_time < _delegate->get_last_known_hardfork_time()) + { + if(block_message_to_process.block.timestamp.sec_since_epoch() >= _delegate->get_last_known_hardfork_time().sec_since_epoch()) { disconnect_this_peer = true; } } - } - if(peer->last_known_hardfork_time < _delegate->get_last_known_hardfork_time()) - { - if(block_message_to_process.block.timestamp.sec_since_epoch() >= _delegate->get_last_known_hardfork_time().sec_since_epoch()) + if( disconnect_this_peer ) { - disconnect_this_peer = true; - } - } - - if( disconnect_this_peer ) - { - peers_to_disconnect.insert(peer); + peers_to_disconnect.insert(peer); #ifdef ENABLE_DEBUG_ULOGS - ulog("Disconnecting from peer because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp)); + ulog("Disconnecting from peer because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp)); #endif + } } } @@ -3614,9 +3805,11 @@ namespace graphene { namespace net { namespace detail { disconnect_reason = "You offered me a block that I have deemed to be invalid"; peers_to_disconnect.insert( originating_peer->shared_from_this() ); - for (const peer_connection_ptr& peer : _active_connections) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id) peers_to_disconnect.insert(peer); + } } if (restart_sync_exception) @@ -3737,25 +3930,29 @@ namespace graphene { namespace net { namespace detail { void node_impl::forward_firewall_check_to_next_available_peer(firewall_check_state_data* firewall_check_state) { - for (const peer_connection_ptr& peer : _active_connections) { - if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test - !peer->firewall_check_state && // the peer isn't already performing a check for another node - firewall_check_state->nodes_already_tested.find(peer->node_id) == firewall_check_state->nodes_already_tested.end() && - peer->core_protocol_version >= 106) + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - wlog("forwarding firewall check for node ${to_check} to peer ${checker}", - ("to_check", firewall_check_state->endpoint_to_test) - ("checker", peer->get_remote_endpoint())); - firewall_check_state->nodes_already_tested.insert(peer->node_id); - peer->firewall_check_state = firewall_check_state; - check_firewall_message check_request; - check_request.endpoint_to_check = firewall_check_state->endpoint_to_test; - check_request.node_id = firewall_check_state->expected_node_id; - peer->send_message(check_request); - return; + if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test + !peer->firewall_check_state && // the peer isn't already performing a check for another node + firewall_check_state->nodes_already_tested.find(peer->node_id) == firewall_check_state->nodes_already_tested.end() && + peer->core_protocol_version >= 106) + { + wlog("forwarding firewall check for node ${to_check} to peer ${checker}", + ("to_check", firewall_check_state->endpoint_to_test) + ("checker", peer->get_remote_endpoint())); + firewall_check_state->nodes_already_tested.insert(peer->node_id); + peer->firewall_check_state = firewall_check_state; + check_firewall_message check_request; + check_request.endpoint_to_check = firewall_check_state->endpoint_to_test; + check_request.node_id = firewall_check_state->expected_node_id; + peer->send_message(check_request); + return; + } } } + wlog("Unable to forward firewall check for node ${to_check} to any other peers, returning 'unable'", ("to_check", firewall_check_state->endpoint_to_test)); @@ -3928,41 +4125,45 @@ namespace graphene { namespace net { namespace detail { } fc::time_point now = fc::time_point::now(); - for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) + { + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - current_connection_data data_for_this_peer; - data_for_this_peer.connection_duration = now.sec_since_epoch() - peer->connection_initiation_time.sec_since_epoch(); - if (peer->get_remote_endpoint()) // should always be set for anyone we're actively connected to - data_for_this_peer.remote_endpoint = *peer->get_remote_endpoint(); - data_for_this_peer.clock_offset = peer->clock_offset; - data_for_this_peer.round_trip_delay = peer->round_trip_delay; - data_for_this_peer.node_id = peer->node_id; - data_for_this_peer.connection_direction = peer->direction; - data_for_this_peer.firewalled = peer->is_firewalled; - fc::mutable_variant_object user_data; - if (peer->graphene_git_revision_sha) - user_data["graphene_git_revision_sha"] = *peer->graphene_git_revision_sha; - if (peer->graphene_git_revision_unix_timestamp) - user_data["graphene_git_revision_unix_timestamp"] = *peer->graphene_git_revision_unix_timestamp; - if (peer->fc_git_revision_sha) - user_data["fc_git_revision_sha"] = *peer->fc_git_revision_sha; - if (peer->fc_git_revision_unix_timestamp) - user_data["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp; - if (peer->platform) - user_data["platform"] = *peer->platform; - if (peer->bitness) - user_data["bitness"] = *peer->bitness; - user_data["user_agent"] = peer->user_agent; + current_connection_data data_for_this_peer; + data_for_this_peer.connection_duration = now.sec_since_epoch() - peer->connection_initiation_time.sec_since_epoch(); + if (peer->get_remote_endpoint()) // should always be set for anyone we're actively connected to + data_for_this_peer.remote_endpoint = *peer->get_remote_endpoint(); + data_for_this_peer.clock_offset = peer->clock_offset; + data_for_this_peer.round_trip_delay = peer->round_trip_delay; + data_for_this_peer.node_id = peer->node_id; + data_for_this_peer.connection_direction = peer->direction; + data_for_this_peer.firewalled = peer->is_firewalled; + fc::mutable_variant_object user_data; + if (peer->graphene_git_revision_sha) + user_data["graphene_git_revision_sha"] = *peer->graphene_git_revision_sha; + if (peer->graphene_git_revision_unix_timestamp) + user_data["graphene_git_revision_unix_timestamp"] = *peer->graphene_git_revision_unix_timestamp; + if (peer->fc_git_revision_sha) + user_data["fc_git_revision_sha"] = *peer->fc_git_revision_sha; + if (peer->fc_git_revision_unix_timestamp) + user_data["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp; + if (peer->platform) + user_data["platform"] = *peer->platform; + if (peer->bitness) + user_data["bitness"] = *peer->bitness; + user_data["user_agent"] = peer->user_agent; - user_data["last_known_block_hash"] = fc::variant( peer->last_block_delegate_has_seen, 1 ); - user_data["last_known_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen); - user_data["last_known_block_time"] = peer->last_block_time_delegate_has_seen; + user_data["last_known_block_hash"] = fc::variant( peer->last_block_delegate_has_seen, 1 ); + user_data["last_known_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen); + user_data["last_known_block_time"] = peer->last_block_time_delegate_has_seen; - data_for_this_peer.user_data = user_data; - reply.current_connections.emplace_back(data_for_this_peer); + data_for_this_peer.user_data = user_data; + reply.current_connections.emplace_back(data_for_this_peer); + } } + originating_peer->send_message(reply); } @@ -4047,6 +4248,7 @@ namespace graphene { namespace net { namespace detail { void node_impl::start_synchronizing() { + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) start_synchronizing_with_peer( peer ); } @@ -4253,9 +4455,19 @@ namespace graphene { namespace net { namespace detail { // the read loop before it gets an EOF). // operate off copies of the lists in case they change during iteration std::list all_peers; - boost::push_back(all_peers, _active_connections); - boost::push_back(all_peers, _handshaking_connections); - boost::push_back(all_peers, _closing_connections); + auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); }; + { + fc::scoped_lock lock(_active_connections.get_mutex()); + std::for_each(_active_connections.begin(), _active_connections.end(), p_back); + } + { + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back); + } + { + fc::scoped_lock lock(_closing_connections.get_mutex()); + std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back); + } for (const peer_connection_ptr& peer : all_peers) { @@ -4521,9 +4733,7 @@ namespace graphene { namespace net { namespace detail { // whether the peer is firewalled, we want to disconnect now. _handshaking_connections.erase(new_peer); _terminating_connections.erase(new_peer); - assert(_active_connections.find(new_peer) == _active_connections.end()); _active_connections.erase(new_peer); - assert(_closing_connections.find(new_peer) == _closing_connections.end()); _closing_connections.erase(new_peer); display_current_connections(); @@ -4867,18 +5077,25 @@ namespace graphene { namespace net { namespace detail { peer_connection_ptr node_impl::get_connection_to_endpoint( const fc::ip::endpoint& remote_endpoint ) { VERIFY_CORRECT_THREAD(); - for( const peer_connection_ptr& active_peer : _active_connections ) { - fc::optional endpoint_for_this_peer( active_peer->get_remote_endpoint() ); - if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) - return active_peer; + fc::scoped_lock lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& active_peer : _active_connections ) + { + fc::optional endpoint_for_this_peer( active_peer->get_remote_endpoint() ); + if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) + return active_peer; + } } - for( const peer_connection_ptr& handshaking_peer : _handshaking_connections ) { - fc::optional endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() ); - if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) - return handshaking_peer; + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr& handshaking_peer : _handshaking_connections ) + { + fc::optional endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() ); + if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) + return handshaking_peer; + } } + return peer_connection_ptr(); } @@ -4922,21 +5139,27 @@ namespace graphene { namespace net { namespace detail { ilog( " number of peers: ${active} active, ${handshaking}, ${closing} closing. attempting to maintain ${desired} - ${maximum} peers", ( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )("closing",_closing_connections.size() ) ( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) ); - for( const peer_connection_ptr& peer : _active_connections ) { - ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}", - ( "endpoint", peer->get_remote_endpoint() ) - ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )("in_sync_with_them", !peer->we_need_sync_items_from_peer ) ); - if( peer->we_need_sync_items_from_peer ) - ilog( " above peer has ${count} sync items we might need", ("count", peer->ids_of_items_to_get.size() ) ); - if (peer->inhibit_fetching_sync_blocks) - ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" ); + fc::scoped_lock lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& peer : _active_connections ) + { + ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}", + ( "endpoint", peer->get_remote_endpoint() ) + ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )("in_sync_with_them", !peer->we_need_sync_items_from_peer ) ); + if( peer->we_need_sync_items_from_peer ) + ilog( " above peer has ${count} sync items we might need", ("count", peer->ids_of_items_to_get.size() ) ); + if (peer->inhibit_fetching_sync_blocks) + ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" ); + } } - for( const peer_connection_ptr& peer : _handshaking_connections ) { - ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})", - ( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) ); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr& peer : _handshaking_connections ) + { + ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})", + ( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) ); + } } ilog( "--------- MEMORY USAGE ------------" ); @@ -4946,6 +5169,7 @@ namespace graphene { namespace net { namespace detail { ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) ); ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) ); ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) ); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) ); @@ -5043,6 +5267,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); std::vector statuses; + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -5233,10 +5458,13 @@ namespace graphene { namespace net { namespace detail { _allowed_peers.clear(); _allowed_peers.insert(allowed_peers.begin(), allowed_peers.end()); std::list peers_to_disconnect; - if (!_allowed_peers.empty()) - for (const peer_connection_ptr& peer : _active_connections) + if (!_allowed_peers.empty()) { + fc::scoped_lock lock(_active_connections.get_mutex()); + for (const peer_connection_ptr &peer : _active_connections) { if (_allowed_peers.find(peer->node_id) == _allowed_peers.end()) peers_to_disconnect.push_back(peer); + } + } for (const peer_connection_ptr& peer : peers_to_disconnect) disconnect_from_peer(peer.get(), "My allowed_peers list has changed, and you're no longer allowed. Bye."); #endif // ENABLE_P2P_DEBUGGING_API