Dequeue a thread from a blocking promise's notify list when the all the thread's tasks that are waiting on that promise are canceled. This prevents a crash when the promise is fulfilled after the thread is destroyed.

Re-organize read-loop execution to avoid crashes in read_loop on Win32 when ntp object destructs. Call quit on ntp_thread when ntp object destructs to free up thread (eventually we need to make fc::threads call quit in their destructor, but more work is required to make that work properly).
~fc::udp_socket now closes socket on destruction (consider doing this for tcp sockets as well).
This commit is contained in:
dnotestein 2014-09-08 15:31:13 -04:00
parent 1360eabd8c
commit 10fdbcf5b3
4 changed files with 102 additions and 61 deletions

View file

@ -92,9 +92,7 @@ namespace fc {
bool _ready;
mutable spin_yield_lock _spin_yield;
thread* _blocked_thread;
#ifndef NDEBUG
unsigned _blocked_fiber_count;
#endif
time_point _timeout;
fc::exception_ptr _exceptp;
bool _canceled;

View file

@ -18,6 +18,7 @@ namespace fc
{
public:
/** vector < host, port > */
fc::thread _ntp_thread;
std::vector< std::pair< std::string, uint16_t> > _ntp_hosts;
fc::future<void> _read_loop_done;
udp_socket _sock;
@ -27,19 +28,23 @@ namespace fc
std::atomic_bool _last_ntp_delta_initialized;
std::atomic<int64_t> _last_ntp_delta_microseconds;
fc::thread _ntp_thread;
fc::future<void> _request_time_task_done;
ntp_impl() :
_ntp_thread("ntp"),
_request_interval_sec( 60*60 /* 1 hr */),
_last_ntp_delta_microseconds(0),
_ntp_thread("ntp")
_last_ntp_delta_microseconds(0)
{
_last_ntp_delta_initialized = false;
_ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) );
}
~ntp_impl()
{
_ntp_thread.quit(); //TODO: this can be removed once fc::threads call quit during destruction
}
void request_now()
{
assert(_ntp_thread.is_current());
@ -55,8 +60,6 @@ namespace fc
std::array<unsigned char, 48> send_buf { {010,0,0,0,0,0,0,0,0} };
_last_request_time = fc::time_point::now();
_sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep );
if (!_read_loop_done.valid() || _read_loop_done.ready())
_read_loop_done = async( [this](){ read_loop(); }, "ntp_read_loop" );
break;
}
}
@ -83,55 +86,80 @@ namespace fc
"request_time_task" );
} // request_loop
void start_read_loop()
{
_read_loop_done = _ntp_thread.async( [this](){ read_loop(); }, "ntp_read_loop" );
}
void read_loop()
{
assert(_ntp_thread.is_current());
while( !_read_loop_done.canceled() )
//outer while to restart read-loop if exception is thrown while waiting to receive on socket.
//while( !_read_loop_done.canceled() )
{
fc::ip::endpoint from;
std::array<uint64_t, 1024> recv_buf;
try
{
_sock.receive_from( (char*)recv_buf.data(), recv_buf.size(), from );
} FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket");
// if you start the read while loop here, the recieve_from call will throw "invalid argument" on win32,
// so instead we start the loop after making our first request
try
{
_sock.open();
request_time_task();
uint64_t receive_timestamp_net_order = recv_buf[4];
uint64_t receive_timestamp_host = bswap_64(receive_timestamp_net_order);
uint32_t fractional_seconds = receive_timestamp_host & 0xffffffff;
uint32_t microseconds = (uint32_t)(((((uint64_t)fractional_seconds) * 1000000) + (UINT64_C(1)<<31)) >> 32);
uint32_t seconds_since_1900 = receive_timestamp_host >> 32;
uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800;
while( !_read_loop_done.canceled() )
{
fc::ip::endpoint from;
std::array<uint64_t, 1024> recv_buf;
try
{
_sock.receive_from( (char*)recv_buf.data(), recv_buf.size(), from );
} FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket");
if( fc::time_point::now() - _last_request_time > fc::seconds(1) )
request_now();
else
{
auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds));
if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) &&
fc::time_point::now() - ntp_time < fc::seconds(60*60*24) )
{
_last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count();
_last_ntp_delta_initialized = true;
}
else
elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) );
}
}
} // read_loop
uint64_t receive_timestamp_net_order = recv_buf[4];
uint64_t receive_timestamp_host = bswap_64(receive_timestamp_net_order);
uint32_t fractional_seconds = receive_timestamp_host & 0xffffffff;
uint32_t microseconds = (uint32_t)(((((uint64_t)fractional_seconds) * 1000000) + (UINT64_C(1)<<31)) >> 32);
uint32_t seconds_since_1900 = receive_timestamp_host >> 32;
uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800;
if( fc::time_point::now() - _last_request_time > fc::seconds(1) )
request_now();
else
{
auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds));
if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) &&
fc::time_point::now() - ntp_time < fc::seconds(60*60*24) )
{
_last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count();
_last_ntp_delta_initialized = true;
}
else
elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) );
}
}
} // try
catch (fc::canceled_exception)
{
throw;
}
catch (...)
{
//swallow any other exception and restart loop
elog("unexpected exception in read_loop, going to restart it.");
}
_sock.close();
fc::usleep(fc::seconds(_request_interval_sec));
} //outer while loop
} //end read_loop()
};
} // namespace detail
ntp::ntp()
:my( new detail::ntp_impl() )
{
my->_sock.open();
// if you start the read loop here, the recieve_from call will throw "invalid argument" on win32,
// so instead we trigger the read loop after making our first request
my->_request_time_task_done = my->_ntp_thread.async( [=](){ my->request_time_task(); }, "request_time_task" );
my->start_read_loop();
}
ntp::~ntp()
@ -152,13 +180,8 @@ namespace fc
try
{
my->_read_loop_done.cancel("ntp object is destructing");
my->_sock.close();
my->_read_loop_done.wait();
my->_read_loop_done.cancel_and_wait("ntp object is destructing");
}
catch ( const fc::canceled_exception& )
{
}
catch ( const fc::exception& e )
{
wlog( "Exception thrown while shutting down NTP's read_loop, ignoring: ${e}", ("e",e) );
@ -167,6 +190,7 @@ namespace fc
{
wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" );
}
}, "ntp_shutdown_task").wait();
}

View file

@ -24,11 +24,24 @@ namespace fc {
}
udp_socket::udp_socket()
:my( new impl() ) {
:my( new impl() )
{
}
udp_socket::udp_socket( const udp_socket& s )
:my(s.my){}
udp_socket::~udp_socket() {
:my(s.my)
{
}
udp_socket::~udp_socket()
{
try
{
my->_sock.close(); //close boost socket to make any pending reads run their completion handler
}
catch (...) //avoid destructor throw and likely this is just happening because socket wasn't open.
{
}
}
size_t udp_socket::send_to( const char* b, size_t l, const ip::endpoint& to ) {
@ -71,8 +84,8 @@ namespace fc {
boost::asio::ip::udp::endpoint from;
promise<size_t>::ptr p(new promise<size_t>("udp_socket::send_to"));
my->_sock.async_receive_from( boost::asio::buffer(b,l), from,
[=]( const boost::system::error_code& ec, size_t bt ) {
if( !ec ) p->set_value(bt);
[=]( const boost::system::error_code& ec, size_t bytes_transferred ) {
if( !ec ) p->set_value(bytes_transferred);
else p->set_exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "${message} ",
("message", boost::system::system_error(ec).what())) ) ) );

View file

@ -12,9 +12,7 @@ namespace fc {
promise_base::promise_base( const char* desc )
:_ready(false),
_blocked_thread(nullptr),
#ifndef NDEBUG
_blocked_fiber_count(0),
#endif
_timeout(time_point::maximum()),
_canceled(false),
#ifndef NDEBUG
@ -64,29 +62,35 @@ namespace fc {
}
_enqueue_thread();
}
thread::current().wait_until( ptr(this,true), timeout_us );
try
{
thread::current().wait_until( ptr(this,true), timeout_us );
}
catch (...)
{
_dequeue_thread();
throw;
}
_dequeue_thread();
if( _ready ) {
if( _exceptp ) _exceptp->dynamic_rethrow_exception();
if( _ready )
{
if( _exceptp )
_exceptp->dynamic_rethrow_exception();
return;
}
FC_THROW_EXCEPTION( timeout_exception, "" );
}
void promise_base::_enqueue_thread(){
#ifndef NDEBUG
++_blocked_fiber_count;
// only one thread can wait on a promise at any given time
assert(!_blocked_thread ||
_blocked_thread == &thread::current());
#endif
_blocked_thread = &thread::current();
}
void promise_base::_dequeue_thread(){
#ifndef NDEBUG
synchronized(_spin_yield)
if (!--_blocked_fiber_count)
_blocked_thread = nullptr;
#endif
}
void promise_base::_notify(){
// copy _blocked_thread into a local so that if the thread unblocks (e.g.,
@ -109,6 +113,8 @@ namespace fc {
// slog( "%p == %d", &_ready, int(_ready));
// BOOST_ASSERT( !_ready );
{ synchronized(_spin_yield)
if (_ready) //don't allow promise to be set more than once
return;
_ready = true;
}
_notify();