From c4e814d7ded6099dc4297aa843d4be7afa98b3ed Mon Sep 17 00:00:00 2001 From: dnotestein Date: Tue, 30 Sep 2014 10:53:23 -0400 Subject: [PATCH] Added more logging to ntp and fixed code indentation. --- src/network/ntp.cpp | 290 +++++++++++++++++++++++--------------------- 1 file changed, 150 insertions(+), 140 deletions(-) diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index dee60dc..dad1bd3 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -14,151 +14,161 @@ namespace fc { namespace detail { - class ntp_impl - { - public: - /** vector < host, port > */ - fc::thread _ntp_thread; - std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; - fc::future _read_loop_done; - udp_socket _sock; - uint32_t _request_interval_sec; - fc::time_point _last_request_time; + class ntp_impl + { + public: + /** vector < host, port > */ + fc::thread _ntp_thread; + std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; + fc::future _read_loop_done; + udp_socket _sock; + uint32_t _request_interval_sec; + fc::time_point _last_request_time; - std::atomic_bool _last_ntp_delta_initialized; - std::atomic _last_ntp_delta_microseconds; + std::atomic_bool _last_ntp_delta_initialized; + std::atomic _last_ntp_delta_microseconds; - fc::future _request_time_task_done; + fc::future _request_time_task_done; - ntp_impl() : - _ntp_thread("ntp"), - _request_interval_sec( 60*60 /* 1 hr */), - _last_ntp_delta_microseconds(0) - { - _last_ntp_delta_initialized = false; - _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); - } + ntp_impl() : + _ntp_thread("ntp"), + _request_interval_sec( 60*60 /* 1 hr */), + _last_ntp_delta_microseconds(0) + { + _last_ntp_delta_initialized = false; + _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); + } - ~ntp_impl() - { - _ntp_thread.quit(); //TODO: this can be removed once fc::threads call quit during destruction - } + ~ntp_impl() + { + _ntp_thread.quit(); //TODO: this can be removed once fc::threads call quit during destruction + } - void request_now() - { - assert(_ntp_thread.is_current()); - for( auto item : _ntp_hosts ) - { - try - { - wlog( "resolving... ${r}", ("r", item) ); - auto eps = resolve( item.first, item.second ); - for( auto ep : eps ) - { - ilog( "sending request to ${ep}", ("ep",ep) ); - std::shared_ptr send_buffer(new char[48], [](char* p){ delete[] p; }); - std::array packet_to_send { {010,0,0,0,0,0,0,0,0} }; - memcpy(send_buffer.get(), packet_to_send.data(), packet_to_send.size()); - _last_request_time = fc::time_point::now(); - _sock.send_to( send_buffer, packet_to_send.size(), ep ); - break; - } - } - catch (const fc::canceled_exception&) - { - throw; - } - // this could fail to resolve but we want to go on to other hosts.. - catch ( const fc::exception& e ) - { - elog( "${e}", ("e",e.to_detail_string() ) ); - } - } - } // request_now + void request_now() + { + assert(_ntp_thread.is_current()); + for( auto item : _ntp_hosts ) + { + try + { + wlog( "resolving... ${r}", ("r", item) ); + auto eps = resolve( item.first, item.second ); + for( auto ep : eps ) + { + wlog( "sending request to ${ep}", ("ep",ep) ); + std::shared_ptr send_buffer(new char[48], [](char* p){ delete[] p; }); + std::array packet_to_send { {010,0,0,0,0,0,0,0,0} }; + memcpy(send_buffer.get(), packet_to_send.data(), packet_to_send.size()); + _last_request_time = fc::time_point::now(); + _sock.send_to( send_buffer, packet_to_send.size(), ep ); + break; + } + } + catch (const fc::canceled_exception&) + { + throw; + } + // this could fail to resolve but we want to go on to other hosts.. + catch ( const fc::exception& e ) + { + elog( "${e}", ("e",e.to_detail_string() ) ); + } + } + } // request_now - //started for first time in ntp() constructor, canceled in ~ntp() destructor - void request_time_task() - { - assert(_ntp_thread.is_current()); - request_now(); - if (!_request_time_task_done.canceled()) - _request_time_task_done = schedule( [=](){ request_time_task(); }, - fc::time_point::now() + fc::seconds(_request_interval_sec), - "request_time_task" ); - } // request_loop + //started for first time in ntp() constructor, canceled in ~ntp() destructor + void request_time_task() + { + assert(_ntp_thread.is_current()); + request_now(); + if (!_request_time_task_done.canceled()) + _request_time_task_done = schedule( [=](){ request_time_task(); }, + fc::time_point::now() + fc::seconds(_request_interval_sec), + "request_time_task" ); + } // request_loop - void start_read_loop() - { - _read_loop_done = _ntp_thread.async( [this](){ read_loop(); }, "ntp_read_loop" ); - } + void start_read_loop() + { + _read_loop_done = _ntp_thread.async( [this](){ read_loop(); }, "ntp_read_loop" ); + } - void read_loop() - { - assert(_ntp_thread.is_current()); + void read_loop() + { + assert(_ntp_thread.is_current()); - uint32_t receive_buffer_size = sizeof(uint64_t) * 1024; - std::shared_ptr receive_buffer(new char[receive_buffer_size], [](char* p){ delete[] p; }); - uint64_t* recv_buf = (uint64_t*)receive_buffer.get(); + uint32_t receive_buffer_size = sizeof(uint64_t) * 1024; + std::shared_ptr receive_buffer(new char[receive_buffer_size], [](char* p){ delete[] p; }); + uint64_t* recv_buf = (uint64_t*)receive_buffer.get(); - //outer while to restart read-loop if exception is thrown while waiting to receive on socket. - while( !_read_loop_done.canceled() ) + //outer while to restart read-loop if exception is thrown while waiting to receive on socket. + while( !_read_loop_done.canceled() ) + { + // if you start the read while loop here, the recieve_from call will throw "invalid argument" on win32, + // so instead we start the loop after making our first request + try + { + _sock.open(); + request_time_task(); //this will re-send a time request + + while( !_read_loop_done.canceled() ) + { + fc::ip::endpoint from; + try { - // if you start the read while loop here, the recieve_from call will throw "invalid argument" on win32, - // so instead we start the loop after making our first request - try + _sock.receive_from( receive_buffer, receive_buffer_size, from ); + wlog("received ntp reply from ${from}",("from",from) ); + } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket"); + + uint64_t receive_timestamp_net_order = recv_buf[4]; + uint64_t receive_timestamp_host = bswap_64(receive_timestamp_net_order); + uint32_t fractional_seconds = receive_timestamp_host & 0xffffffff; + uint32_t microseconds = (uint32_t)(((((uint64_t)fractional_seconds) * 1000000) + (UINT64_C(1)<<31)) >> 32); + uint32_t seconds_since_1900 = receive_timestamp_host >> 32; + uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; + + //if the reply we just received has occurred more than a second after our last time request (it was more than a second ago since our last request) + if( fc::time_point::now() - _last_request_time > fc::seconds(1) ) + { + wlog("received stale ntp reply requested at ${request_time}, send a new time request",("request_time",_last_request_time)); + request_now(); //request another reply and ignore this one + } + else //we think we have a timely reply, process it + { + auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds)); + if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && + fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) { - _sock.open(); - request_time_task(); - - while( !_read_loop_done.canceled() ) - { - fc::ip::endpoint from; - try - { - _sock.receive_from( receive_buffer, receive_buffer_size, from ); - } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket"); - - uint64_t receive_timestamp_net_order = recv_buf[4]; - uint64_t receive_timestamp_host = bswap_64(receive_timestamp_net_order); - uint32_t fractional_seconds = receive_timestamp_host & 0xffffffff; - uint32_t microseconds = (uint32_t)(((((uint64_t)fractional_seconds) * 1000000) + (UINT64_C(1)<<31)) >> 32); - uint32_t seconds_since_1900 = receive_timestamp_host >> 32; - uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; - - if( fc::time_point::now() - _last_request_time > fc::seconds(1) ) - request_now(); - else - { - auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds)); - if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && - fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) - { - _last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count(); - _last_ntp_delta_initialized = true; - fc::microseconds ntp_delta_time = fc::microseconds(_last_ntp_delta_microseconds); - wlog("ntp_delta_time updated to ${delta_time}", ("delta_time",ntp_delta_time) ); - } - else - elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); - } - } - } // try - catch (fc::canceled_exception) - { - throw; - } - catch (...) - { - //swallow any other exception and restart loop - elog("unexpected exception in read_loop, going to restart it."); - } - _sock.close(); - fc::usleep(fc::seconds(_request_interval_sec)); - } //outer while loop - wlog("exiting ntp read_loop"); - } //end read_loop() - }; //ntp_impl + _last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count(); + _last_ntp_delta_initialized = true; + fc::microseconds ntp_delta_time = fc::microseconds(_last_ntp_delta_microseconds); + wlog("ntp_delta_time updated to ${delta_time}", ("delta_time",ntp_delta_time) ); + } + else + elog( "NTP time and local time vary by more than a day! ntp:${ntp_time} local:${local}", ("ntp_time",ntp_time)("local",fc::time_point::now()) ); + } + } + } // try + catch (fc::canceled_exception) + { + throw; + } + catch (const fc::exception& e) + { + //swallow any other exception and restart loop + elog("exception in read_loop, going to restart it. ${e}",("e",e)); + } + catch (...) + { + //swallow any other exception and restart loop + elog("unknown exception in read_loop, going to restart it."); + } + _sock.close(); + fc::usleep(fc::seconds(_request_interval_sec)); + } //outer while loop + wlog("exiting ntp read_loop"); + } //end read_loop() + }; //ntp_impl } // namespace detail @@ -167,7 +177,7 @@ namespace fc ntp::ntp() :my( new detail::ntp_impl() ) { - my->start_read_loop(); + my->start_read_loop(); } ntp::~ntp() @@ -205,24 +215,24 @@ namespace fc void ntp::add_server( const std::string& hostname, uint16_t port) { - my->_ntp_thread.async( [=](){ my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); }, "add_server" ).wait(); + my->_ntp_thread.async( [=](){ my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); }, "add_server" ).wait(); } void ntp::set_request_interval( uint32_t interval_sec ) { - my->_request_interval_sec = interval_sec; + my->_request_interval_sec = interval_sec; } void ntp::request_now() { - my->_ntp_thread.async( [=](){ my->request_now(); }, "request_now" ).wait(); + my->_ntp_thread.async( [=](){ my->request_now(); }, "request_now" ).wait(); } optional ntp::get_time()const { - if( my->_last_ntp_delta_initialized ) - return fc::time_point::now() + fc::microseconds(my->_last_ntp_delta_microseconds); - return optional(); + if( my->_last_ntp_delta_initialized ) + return fc::time_point::now() + fc::microseconds(my->_last_ntp_delta_microseconds); + return optional(); } -} +} //namespace fc