Compare commits
3 commits
master
...
bug/501-on
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fba1c0cb95 | ||
|
|
5e75e8043a | ||
|
|
5203869a9d |
1 changed files with 63 additions and 67 deletions
|
|
@ -559,14 +559,14 @@ namespace graphene { namespace net { namespace detail {
|
||||||
|
|
||||||
peer_database _potential_peer_db;
|
peer_database _potential_peer_db;
|
||||||
fc::promise<void>::ptr _retrigger_connect_loop_promise;
|
fc::promise<void>::ptr _retrigger_connect_loop_promise;
|
||||||
bool _potential_peer_database_updated;
|
std::atomic_bool _potential_peer_database_updated;
|
||||||
fc::future<void> _p2p_network_connect_loop_done;
|
fc::future<void> _p2p_network_connect_loop_done;
|
||||||
// @}
|
// @}
|
||||||
|
|
||||||
/// used by the task that fetches sync items during synchronization
|
/// used by the task that fetches sync items during synchronization
|
||||||
// @{
|
// @{
|
||||||
fc::promise<void>::ptr _retrigger_fetch_sync_items_loop_promise;
|
fc::promise<void>::ptr _retrigger_fetch_sync_items_loop_promise;
|
||||||
bool _sync_items_to_fetch_updated;
|
std::atomic_bool _sync_items_to_fetch_updated;
|
||||||
fc::future<void> _fetch_sync_items_loop_done;
|
fc::future<void> _fetch_sync_items_loop_done;
|
||||||
|
|
||||||
typedef std::unordered_map<graphene::net::block_id_type, fc::time_point> active_sync_requests_map;
|
typedef std::unordered_map<graphene::net::block_id_type, fc::time_point> active_sync_requests_map;
|
||||||
|
|
@ -576,13 +576,13 @@ namespace graphene { namespace net { namespace detail {
|
||||||
std::list<graphene::net::block_message> _received_sync_items; /// list of sync blocks we've received, but can't yet process because we are still missing blocks that come earlier in the chain
|
std::list<graphene::net::block_message> _received_sync_items; /// list of sync blocks we've received, but can't yet process because we are still missing blocks that come earlier in the chain
|
||||||
// @}
|
// @}
|
||||||
|
|
||||||
fc::future<void> _process_backlog_of_sync_blocks_done;
|
fc::future<void> _process_backlog_of_sync_blocks_done;
|
||||||
bool _suspend_fetching_sync_blocks;
|
std::atomic_bool _suspend_fetching_sync_blocks;
|
||||||
|
|
||||||
/// used by the task that fetches items during normal operation
|
/// used by the task that fetches items during normal operation
|
||||||
// @{
|
// @{
|
||||||
fc::promise<void>::ptr _retrigger_fetch_item_loop_promise;
|
fc::promise<void>::ptr _retrigger_fetch_item_loop_promise;
|
||||||
bool _items_to_fetch_updated;
|
std::atomic_bool _items_to_fetch_updated;
|
||||||
fc::future<void> _fetch_item_loop_done;
|
fc::future<void> _fetch_item_loop_done;
|
||||||
|
|
||||||
struct item_id_index{};
|
struct item_id_index{};
|
||||||
|
|
@ -1687,23 +1687,18 @@ namespace graphene { namespace net { namespace detail {
|
||||||
{
|
{
|
||||||
VERIFY_CORRECT_THREAD();
|
VERIFY_CORRECT_THREAD();
|
||||||
|
|
||||||
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
|
|
||||||
std::list<peer_connection_ptr> original_active_peers(_active_connections.begin(), _active_connections.end());
|
|
||||||
for( const peer_connection_ptr& active_peer : original_active_peers )
|
|
||||||
{
|
{
|
||||||
try
|
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
|
||||||
{
|
for (const peer_connection_ptr &active_peer : _active_connections) {
|
||||||
active_peer->send_message(address_request_message());
|
try {
|
||||||
}
|
active_peer->send_message(address_request_message());
|
||||||
catch ( const fc::canceled_exception& )
|
} catch (const fc::canceled_exception &) {
|
||||||
{
|
throw;
|
||||||
throw;
|
} catch (const fc::exception &e) {
|
||||||
}
|
dlog("Caught exception while sending address request message to peer ${peer} : ${e}",
|
||||||
catch (const fc::exception& e)
|
("peer", active_peer->get_remote_endpoint())("e", e));
|
||||||
{
|
}
|
||||||
dlog("Caught exception while sending address request message to peer ${peer} : ${e}",
|
}
|
||||||
("peer", active_peer->get_remote_endpoint())("e", e));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// this has nothing to do with updating the peer list, but we need to prune this list
|
// this has nothing to do with updating the peer list, but we need to prune this list
|
||||||
|
|
@ -1718,6 +1713,7 @@ namespace graphene { namespace net { namespace detail {
|
||||||
fc::time_point::now() + fc::minutes(15),
|
fc::time_point::now() + fc::minutes(15),
|
||||||
"fetch_updated_peer_lists_loop" );
|
"fetch_updated_peer_lists_loop" );
|
||||||
}
|
}
|
||||||
|
|
||||||
void node_impl::update_bandwidth_data(uint32_t bytes_read_this_second, uint32_t bytes_written_this_second)
|
void node_impl::update_bandwidth_data(uint32_t bytes_read_this_second, uint32_t bytes_written_this_second)
|
||||||
{
|
{
|
||||||
VERIFY_CORRECT_THREAD();
|
VERIFY_CORRECT_THREAD();
|
||||||
|
|
@ -3215,35 +3211,6 @@ namespace graphene { namespace net { namespace detail {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_closing_connections.erase(originating_peer_ptr);
|
|
||||||
_handshaking_connections.erase(originating_peer_ptr);
|
|
||||||
_terminating_connections.erase(originating_peer_ptr);
|
|
||||||
if (_active_connections.find(originating_peer_ptr) != _active_connections.end())
|
|
||||||
{
|
|
||||||
_active_connections.erase(originating_peer_ptr);
|
|
||||||
|
|
||||||
if (inbound_endpoint && originating_peer_ptr->get_remote_endpoint())
|
|
||||||
{
|
|
||||||
fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
|
|
||||||
if (updated_peer_record)
|
|
||||||
{
|
|
||||||
updated_peer_record->last_seen_time = fc::time_point::now();
|
|
||||||
_potential_peer_db.update_entry(*updated_peer_record);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ilog("Remote peer ${endpoint} closed their connection to us", ("endpoint", originating_peer->get_remote_endpoint()));
|
|
||||||
display_current_connections();
|
|
||||||
trigger_p2p_network_connect_loop();
|
|
||||||
|
|
||||||
// notify the node delegate so it can update the display
|
|
||||||
if( _active_connections.size() != _last_reported_number_of_connections )
|
|
||||||
{
|
|
||||||
_last_reported_number_of_connections = (uint32_t)_active_connections.size();
|
|
||||||
_delegate->connection_count_changed( _last_reported_number_of_connections );
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we had delegated a firewall check to this peer, send it to another peer
|
// if we had delegated a firewall check to this peer, send it to another peer
|
||||||
if (originating_peer->firewall_check_state)
|
if (originating_peer->firewall_check_state)
|
||||||
{
|
{
|
||||||
|
|
@ -3281,6 +3248,36 @@ namespace graphene { namespace net { namespace detail {
|
||||||
trigger_fetch_items_loop();
|
trigger_fetch_items_loop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_active_connections.find(originating_peer_ptr) != _active_connections.end())
|
||||||
|
{
|
||||||
|
_active_connections.erase(originating_peer_ptr);
|
||||||
|
|
||||||
|
if (inbound_endpoint && originating_peer_ptr->get_remote_endpoint())
|
||||||
|
{
|
||||||
|
fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
|
||||||
|
if (updated_peer_record)
|
||||||
|
{
|
||||||
|
updated_peer_record->last_seen_time = fc::time_point::now();
|
||||||
|
_potential_peer_db.update_entry(*updated_peer_record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// notify the node delegate so it can update the display
|
||||||
|
if( _active_connections.size() != _last_reported_number_of_connections )
|
||||||
|
{
|
||||||
|
_last_reported_number_of_connections = (uint32_t)_active_connections.size();
|
||||||
|
_delegate->connection_count_changed( _last_reported_number_of_connections );
|
||||||
|
}
|
||||||
|
|
||||||
|
_closing_connections.erase(originating_peer_ptr);
|
||||||
|
_handshaking_connections.erase(originating_peer_ptr);
|
||||||
|
_terminating_connections.erase(originating_peer_ptr);
|
||||||
|
|
||||||
|
ilog("Remote peer ${endpoint} closed their connection to us", ("endpoint", originating_peer->get_remote_endpoint()));
|
||||||
|
display_current_connections();
|
||||||
|
trigger_p2p_network_connect_loop();
|
||||||
|
|
||||||
schedule_peer_for_deletion(originating_peer_ptr);
|
schedule_peer_for_deletion(originating_peer_ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3575,25 +3572,24 @@ namespace graphene { namespace net { namespace detail {
|
||||||
{
|
{
|
||||||
dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
|
dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
|
||||||
std::vector< peer_connection_ptr > peers_needing_next_batch;
|
std::vector< peer_connection_ptr > peers_needing_next_batch;
|
||||||
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
|
|
||||||
for (const peer_connection_ptr& peer : _active_connections)
|
|
||||||
{
|
{
|
||||||
auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
|
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
|
||||||
if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
|
for (const peer_connection_ptr &peer : _active_connections) {
|
||||||
{
|
auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
|
||||||
peer->ids_of_items_being_processed.erase(items_being_processed_iter);
|
if (items_being_processed_iter != peer->ids_of_items_being_processed.end()) {
|
||||||
dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
|
peer->ids_of_items_being_processed.erase(items_being_processed_iter);
|
||||||
("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
|
dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
|
||||||
|
("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
|
||||||
|
|
||||||
// if we just processed the last item in our list from this peer, we will want to
|
// if we just processed the last item in our list from this peer, we will want to
|
||||||
// send another request to find out if we are now in sync (this is normally handled in
|
// send another request to find out if we are now in sync (this is normally handled in
|
||||||
// send_sync_block_to_node_delegate)
|
// send_sync_block_to_node_delegate)
|
||||||
if (peer->ids_of_items_to_get.empty() &&
|
if (peer->ids_of_items_to_get.empty() &&
|
||||||
peer->number_of_unfetched_item_ids == 0 &&
|
peer->number_of_unfetched_item_ids == 0 &&
|
||||||
peer->ids_of_items_being_processed.empty())
|
peer->ids_of_items_being_processed.empty()) {
|
||||||
{
|
dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint()));
|
||||||
dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint()));
|
peers_needing_next_batch.push_back(peer);
|
||||||
peers_needing_next_batch.push_back( peer );
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue