Cleanup all async tasks on exit, thread safety, work in progress.

This commit is contained in:
Eric Frias 2014-07-26 18:22:38 -04:00
parent 600ae24657
commit b460fd6b41

View file

@ -7,6 +7,7 @@
#include <stdint.h> #include <stdint.h>
#include "../byteswap.hpp" #include "../byteswap.hpp"
#include <atomic>
#include <array> #include <array>
namespace fc namespace fc
@ -16,22 +17,32 @@ namespace fc
class ntp_impl class ntp_impl
{ {
public: public:
ntp_impl():_request_interval_sec( 60*60 /* 1 hr */),_ntp_thread("ntp")
{
_ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) );
}
/** vector < host, port > */ /** vector < host, port > */
std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; std::vector< std::pair< std::string, uint16_t> > _ntp_hosts;
fc::future<void> _read_loop; fc::future<void> _read_loop_done;
udp_socket _sock; udp_socket _sock;
uint32_t _request_interval_sec; uint32_t _request_interval_sec;
fc::time_point _last_request_time; fc::time_point _last_request_time;
optional<fc::microseconds> _last_ntp_delta;
std::atomic_bool _last_ntp_delta_initialized;
std::atomic<int64_t> _last_ntp_delta_microseconds;
fc::thread _ntp_thread; fc::thread _ntp_thread;
fc::future<void> _request_time_task_done;
ntp_impl() :
_request_interval_sec( 60*60 /* 1 hr */),
_last_ntp_delta_microseconds(0),
_ntp_thread("ntp")
{
_last_ntp_delta_initialized = false;
_ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) );
}
void request_now() void request_now()
{ {
assert(_ntp_thread.is_current());
for( auto item : _ntp_hosts ) for( auto item : _ntp_hosts )
{ {
try try
@ -44,28 +55,31 @@ namespace fc
std::array<unsigned char, 48> send_buf { {010,0,0,0,0,0,0,0,0} }; std::array<unsigned char, 48> send_buf { {010,0,0,0,0,0,0,0,0} };
_last_request_time = fc::time_point::now(); _last_request_time = fc::time_point::now();
_sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep ); _sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep );
if (!_read_loop.valid() || _read_loop.ready()) if (!_read_loop_done.valid() || _read_loop_done.ready())
_read_loop = async( [this](){ read_loop(); } ); _read_loop_done = async( [this](){ read_loop(); }, "ntp_read_loop" );
break; break;
} }
} }
// this could fail to resolve but we want to go on to other hosts.. // this could fail to resolve but we want to go on to other hosts..
catch ( const fc::exception& e ) catch ( const fc::exception& e )
{ {
elog( "${e}", ("e",e.to_detail_string() ) ); elog( "${e}", ("e",e.to_detail_string() ) );
} }
} }
} // request_now } // request_now
void request_time() void request_time_task()
{ {
request_now(); request_now();
_ntp_thread.schedule( [=](){ request_time(); }, fc::time_point::now() + fc::seconds(_request_interval_sec) ); _request_time_task_done = _ntp_thread.schedule( [=](){ request_time_task(); },
fc::time_point::now() + fc::seconds(_request_interval_sec),
"request_time_task" );
} // request_loop } // request_loop
void read_loop() void read_loop()
{ {
while( !_read_loop.canceled() ) assert(_ntp_thread.is_current());
while( !_read_loop_done.canceled() )
{ {
fc::ip::endpoint from; fc::ip::endpoint from;
std::array<uint64_t, 1024> recv_buf; std::array<uint64_t, 1024> recv_buf;
@ -89,7 +103,8 @@ namespace fc
if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) &&
fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) fc::time_point::now() - ntp_time < fc::seconds(60*60*24) )
{ {
_last_ntp_delta = ntp_time - fc::time_point::now(); _last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count();
_last_ntp_delta_initialized = true;
} }
else else
elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) );
@ -109,27 +124,49 @@ namespace fc
my->_sock.open(); my->_sock.open();
// if you start the read loop here, the recieve_from call will throw "invalid argument" on win32, // if you start the read loop here, the recieve_from call will throw "invalid argument" on win32,
// so instead we trigger the read loop after making our first request // so instead we trigger the read loop after making our first request
my->_ntp_thread.async( [=](){ my->request_time(); } ); my->_request_time_task_done = my->_ntp_thread.async( [=](){ my->request_time_task(); }, "request_time_task" );
} }
ntp::~ntp() ntp::~ntp()
{ {
try { my->_ntp_thread.async([=](){
my->_read_loop.cancel(); try
{
my->_request_time_task_done.cancel_and_wait();
}
catch ( const fc::exception& e )
{
wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring: ${e}", ("e",e) );
}
catch (...)
{
wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring" );
}
try
{
my->_read_loop_done.cancel();
my->_sock.close(); my->_sock.close();
my->_read_loop.wait(); my->_read_loop_done.wait();
} }
catch ( const fc::exception& ) catch ( const fc::canceled_exception& )
{ {
// we expect canceled exceptions, but cannot throw }
// from destructor catch ( const fc::exception& e )
} {
wlog( "Exception thrown while shutting down NTP's read_loop, ignoring: ${e}", ("e",e) );
}
catch (...)
{
wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" );
}
}).wait();
} }
void ntp::add_server( const std::string& hostname, uint16_t port) void ntp::add_server( const std::string& hostname, uint16_t port)
{ {
my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); my->_ntp_thread.async( [=](){ my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); }, "add_server" ).wait();
} }
void ntp::set_request_interval( uint32_t interval_sec ) void ntp::set_request_interval( uint32_t interval_sec )
@ -139,13 +176,13 @@ namespace fc
void ntp::request_now() void ntp::request_now()
{ {
my->_ntp_thread.async( [=](){ my->request_now(); } ).wait(); my->_ntp_thread.async( [=](){ my->request_now(); }, "request_now" ).wait();
} }
optional<time_point> ntp::get_time()const optional<time_point> ntp::get_time()const
{ {
if( my->_last_ntp_delta ) if( my->_last_ntp_delta_initialized )
return fc::time_point::now() + *my->_last_ntp_delta; return fc::time_point::now() + fc::microseconds(my->_last_ntp_delta_microseconds);
return optional<time_point>(); return optional<time_point>();
} }