From b460fd6b41cf87476eda088ac132e9be9a115d20 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Sat, 26 Jul 2014 18:22:38 -0400 Subject: [PATCH] Cleanup all async tasks on exit, thread safety, work in progress. --- src/network/ntp.cpp | 93 +++++++++++++++++++++++++++++++-------------- 1 file changed, 65 insertions(+), 28 deletions(-) diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index 560e6cd..bfe428e 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -7,6 +7,7 @@ #include #include "../byteswap.hpp" +#include #include namespace fc @@ -16,22 +17,32 @@ namespace fc class ntp_impl { public: - ntp_impl():_request_interval_sec( 60*60 /* 1 hr */),_ntp_thread("ntp") - { - _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); - } - /** vector < host, port > */ std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; - fc::future _read_loop; + fc::future _read_loop_done; udp_socket _sock; uint32_t _request_interval_sec; fc::time_point _last_request_time; - optional _last_ntp_delta; + + std::atomic_bool _last_ntp_delta_initialized; + std::atomic _last_ntp_delta_microseconds; + fc::thread _ntp_thread; + fc::future _request_time_task_done; + + ntp_impl() : + _request_interval_sec( 60*60 /* 1 hr */), + _last_ntp_delta_microseconds(0), + _ntp_thread("ntp") + { + _last_ntp_delta_initialized = false; + _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); + } + void request_now() { + assert(_ntp_thread.is_current()); for( auto item : _ntp_hosts ) { try @@ -44,28 +55,31 @@ namespace fc std::array send_buf { {010,0,0,0,0,0,0,0,0} }; _last_request_time = fc::time_point::now(); _sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep ); - if (!_read_loop.valid() || _read_loop.ready()) - _read_loop = async( [this](){ read_loop(); } ); + if (!_read_loop_done.valid() || _read_loop_done.ready()) + _read_loop_done = async( [this](){ read_loop(); }, "ntp_read_loop" ); break; } } // this could fail to resolve but we want to go on to other hosts.. catch ( const fc::exception& e ) { - elog( "${e}", ("e",e.to_detail_string() ) ); + elog( "${e}", ("e",e.to_detail_string() ) ); } } } // request_now - void request_time() + void request_time_task() { request_now(); - _ntp_thread.schedule( [=](){ request_time(); }, fc::time_point::now() + fc::seconds(_request_interval_sec) ); + _request_time_task_done = _ntp_thread.schedule( [=](){ request_time_task(); }, + fc::time_point::now() + fc::seconds(_request_interval_sec), + "request_time_task" ); } // request_loop void read_loop() { - while( !_read_loop.canceled() ) + assert(_ntp_thread.is_current()); + while( !_read_loop_done.canceled() ) { fc::ip::endpoint from; std::array recv_buf; @@ -89,7 +103,8 @@ namespace fc if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) { - _last_ntp_delta = ntp_time - fc::time_point::now(); + _last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count(); + _last_ntp_delta_initialized = true; } else elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); @@ -109,27 +124,49 @@ namespace fc my->_sock.open(); // if you start the read loop here, the recieve_from call will throw "invalid argument" on win32, // so instead we trigger the read loop after making our first request - my->_ntp_thread.async( [=](){ my->request_time(); } ); + my->_request_time_task_done = my->_ntp_thread.async( [=](){ my->request_time_task(); }, "request_time_task" ); } ntp::~ntp() { - try { - my->_read_loop.cancel(); + my->_ntp_thread.async([=](){ + try + { + my->_request_time_task_done.cancel_and_wait(); + } + catch ( const fc::exception& e ) + { + wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring: ${e}", ("e",e) ); + } + catch (...) + { + wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring" ); + } + + try + { + my->_read_loop_done.cancel(); my->_sock.close(); - my->_read_loop.wait(); - } - catch ( const fc::exception& ) - { - // we expect canceled exceptions, but cannot throw - // from destructor - } + my->_read_loop_done.wait(); + } + catch ( const fc::canceled_exception& ) + { + } + catch ( const fc::exception& e ) + { + wlog( "Exception thrown while shutting down NTP's read_loop, ignoring: ${e}", ("e",e) ); + } + catch (...) + { + wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" ); + } + }).wait(); } void ntp::add_server( const std::string& hostname, uint16_t port) { - my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); + my->_ntp_thread.async( [=](){ my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); }, "add_server" ).wait(); } void ntp::set_request_interval( uint32_t interval_sec ) @@ -139,13 +176,13 @@ namespace fc void ntp::request_now() { - my->_ntp_thread.async( [=](){ my->request_now(); } ).wait(); + my->_ntp_thread.async( [=](){ my->request_now(); }, "request_now" ).wait(); } optional ntp::get_time()const { - if( my->_last_ntp_delta ) - return fc::time_point::now() + *my->_last_ntp_delta; + if( my->_last_ntp_delta_initialized ) + return fc::time_point::now() + fc::microseconds(my->_last_ntp_delta_microseconds); return optional(); }