diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index 4068dd0..cc2c317 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -92,9 +92,7 @@ namespace fc { bool _ready; mutable spin_yield_lock _spin_yield; thread* _blocked_thread; -#ifndef NDEBUG unsigned _blocked_fiber_count; -#endif time_point _timeout; fc::exception_ptr _exceptp; bool _canceled; diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index 31d8b87..500731e 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -18,6 +18,7 @@ namespace fc { public: /** vector < host, port > */ + fc::thread _ntp_thread; std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; fc::future _read_loop_done; udp_socket _sock; @@ -27,19 +28,23 @@ namespace fc std::atomic_bool _last_ntp_delta_initialized; std::atomic _last_ntp_delta_microseconds; - fc::thread _ntp_thread; fc::future _request_time_task_done; ntp_impl() : + _ntp_thread("ntp"), _request_interval_sec( 60*60 /* 1 hr */), - _last_ntp_delta_microseconds(0), - _ntp_thread("ntp") + _last_ntp_delta_microseconds(0) { _last_ntp_delta_initialized = false; _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); } + ~ntp_impl() + { + _ntp_thread.quit(); //TODO: this can be removed once fc::threads call quit during destruction + } + void request_now() { assert(_ntp_thread.is_current()); @@ -55,8 +60,6 @@ namespace fc std::array send_buf { {010,0,0,0,0,0,0,0,0} }; _last_request_time = fc::time_point::now(); _sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep ); - if (!_read_loop_done.valid() || _read_loop_done.ready()) - _read_loop_done = async( [this](){ read_loop(); }, "ntp_read_loop" ); break; } } @@ -83,55 +86,80 @@ namespace fc "request_time_task" ); } // request_loop + void start_read_loop() + { + _read_loop_done = _ntp_thread.async( [this](){ read_loop(); }, "ntp_read_loop" ); + } + void read_loop() { assert(_ntp_thread.is_current()); - while( !_read_loop_done.canceled() ) + + //outer while to restart read-loop if exception is thrown while waiting to receive on socket. + //while( !_read_loop_done.canceled() ) { - fc::ip::endpoint from; - std::array recv_buf; - try - { - _sock.receive_from( (char*)recv_buf.data(), recv_buf.size(), from ); - } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket"); + // if you start the read while loop here, the recieve_from call will throw "invalid argument" on win32, + // so instead we start the loop after making our first request + try + { + _sock.open(); + request_time_task(); - uint64_t receive_timestamp_net_order = recv_buf[4]; - uint64_t receive_timestamp_host = bswap_64(receive_timestamp_net_order); - uint32_t fractional_seconds = receive_timestamp_host & 0xffffffff; - uint32_t microseconds = (uint32_t)(((((uint64_t)fractional_seconds) * 1000000) + (UINT64_C(1)<<31)) >> 32); - uint32_t seconds_since_1900 = receive_timestamp_host >> 32; - uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; + while( !_read_loop_done.canceled() ) + { + fc::ip::endpoint from; + std::array recv_buf; + try + { + _sock.receive_from( (char*)recv_buf.data(), recv_buf.size(), from ); + } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket"); - if( fc::time_point::now() - _last_request_time > fc::seconds(1) ) - request_now(); - else - { - auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds)); - if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && - fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) - { - _last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count(); - _last_ntp_delta_initialized = true; - } - else - elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); - } - } - } // read_loop + uint64_t receive_timestamp_net_order = recv_buf[4]; + uint64_t receive_timestamp_host = bswap_64(receive_timestamp_net_order); + uint32_t fractional_seconds = receive_timestamp_host & 0xffffffff; + uint32_t microseconds = (uint32_t)(((((uint64_t)fractional_seconds) * 1000000) + (UINT64_C(1)<<31)) >> 32); + uint32_t seconds_since_1900 = receive_timestamp_host >> 32; + uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; + + if( fc::time_point::now() - _last_request_time > fc::seconds(1) ) + request_now(); + else + { + auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds)); + if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && + fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) + { + _last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count(); + _last_ntp_delta_initialized = true; + } + else + elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); + } + } + } // try + catch (fc::canceled_exception) + { + throw; + } + catch (...) + { + //swallow any other exception and restart loop + elog("unexpected exception in read_loop, going to restart it."); + } + _sock.close(); + fc::usleep(fc::seconds(_request_interval_sec)); + } //outer while loop + } //end read_loop() }; } // namespace detail - ntp::ntp() :my( new detail::ntp_impl() ) { - my->_sock.open(); - // if you start the read loop here, the recieve_from call will throw "invalid argument" on win32, - // so instead we trigger the read loop after making our first request - my->_request_time_task_done = my->_ntp_thread.async( [=](){ my->request_time_task(); }, "request_time_task" ); + my->start_read_loop(); } ntp::~ntp() @@ -152,13 +180,8 @@ namespace fc try { - my->_read_loop_done.cancel("ntp object is destructing"); - my->_sock.close(); - my->_read_loop_done.wait(); + my->_read_loop_done.cancel_and_wait("ntp object is destructing"); } - catch ( const fc::canceled_exception& ) - { - } catch ( const fc::exception& e ) { wlog( "Exception thrown while shutting down NTP's read_loop, ignoring: ${e}", ("e",e) ); @@ -167,6 +190,7 @@ namespace fc { wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" ); } + }, "ntp_shutdown_task").wait(); } diff --git a/src/network/udp_socket.cpp b/src/network/udp_socket.cpp index 45df767..02ef9e5 100644 --- a/src/network/udp_socket.cpp +++ b/src/network/udp_socket.cpp @@ -24,11 +24,24 @@ namespace fc { } udp_socket::udp_socket() - :my( new impl() ) { + :my( new impl() ) + { } + udp_socket::udp_socket( const udp_socket& s ) - :my(s.my){} - udp_socket::~udp_socket() { + :my(s.my) + { + } + + udp_socket::~udp_socket() + { + try + { + my->_sock.close(); //close boost socket to make any pending reads run their completion handler + } + catch (...) //avoid destructor throw and likely this is just happening because socket wasn't open. + { + } } size_t udp_socket::send_to( const char* b, size_t l, const ip::endpoint& to ) { @@ -71,8 +84,8 @@ namespace fc { boost::asio::ip::udp::endpoint from; promise::ptr p(new promise("udp_socket::send_to")); my->_sock.async_receive_from( boost::asio::buffer(b,l), from, - [=]( const boost::system::error_code& ec, size_t bt ) { - if( !ec ) p->set_value(bt); + [=]( const boost::system::error_code& ec, size_t bytes_transferred ) { + if( !ec ) p->set_value(bytes_transferred); else p->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); diff --git a/src/thread/future.cpp b/src/thread/future.cpp index 92d4fea..d0b5252 100644 --- a/src/thread/future.cpp +++ b/src/thread/future.cpp @@ -12,9 +12,7 @@ namespace fc { promise_base::promise_base( const char* desc ) :_ready(false), _blocked_thread(nullptr), -#ifndef NDEBUG _blocked_fiber_count(0), -#endif _timeout(time_point::maximum()), _canceled(false), #ifndef NDEBUG @@ -64,29 +62,35 @@ namespace fc { } _enqueue_thread(); } - thread::current().wait_until( ptr(this,true), timeout_us ); + try + { + thread::current().wait_until( ptr(this,true), timeout_us ); + } + catch (...) + { + _dequeue_thread(); + throw; + } _dequeue_thread(); - if( _ready ) { - if( _exceptp ) _exceptp->dynamic_rethrow_exception(); + if( _ready ) + { + if( _exceptp ) + _exceptp->dynamic_rethrow_exception(); return; } FC_THROW_EXCEPTION( timeout_exception, "" ); } void promise_base::_enqueue_thread(){ -#ifndef NDEBUG ++_blocked_fiber_count; // only one thread can wait on a promise at any given time assert(!_blocked_thread || _blocked_thread == &thread::current()); -#endif _blocked_thread = &thread::current(); } void promise_base::_dequeue_thread(){ -#ifndef NDEBUG synchronized(_spin_yield) if (!--_blocked_fiber_count) _blocked_thread = nullptr; -#endif } void promise_base::_notify(){ // copy _blocked_thread into a local so that if the thread unblocks (e.g., @@ -109,6 +113,8 @@ namespace fc { // slog( "%p == %d", &_ready, int(_ready)); // BOOST_ASSERT( !_ready ); { synchronized(_spin_yield) + if (_ready) //don't allow promise to be set more than once + return; _ready = true; } _notify();