diff --git a/CMakeLists.txt b/CMakeLists.txt index ca390f5..dafcdb0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -236,6 +236,7 @@ set( fc_sources src/network/http/http_server.cpp src/network/http/websocket.cpp src/network/ip.cpp + src/network/ntp.cpp src/network/rate_limiting.cpp src/network/resolve.cpp src/network/url.cpp diff --git a/include/fc/crypto/hash_ctr_rng.hpp b/include/fc/crypto/hash_ctr_rng.hpp new file mode 100644 index 0000000..878cf77 --- /dev/null +++ b/include/fc/crypto/hash_ctr_rng.hpp @@ -0,0 +1,107 @@ +#pragma once + +#include + +namespace fc { + +/** + * Always returns 0. Useful for testing. + */ +class nullary_rng +{ + public: + nullary_rng() {} + virtual ~nullary_rng() {} + + template< typename T > T operator()( T max ) + { return T(0); } +} ; + +/** + * The hash_ctr_rng generates bits using a hash function in counter (CTR) + * mode. + */ +template +class hash_ctr_rng +{ + public: + hash_ctr_rng( const char* seed, uint64_t counter = 0 ) + : _counter( counter ), _current_offset( 0 ) + { + memcpy( _seed, seed, SeedLength ); + _reset_current_value(); + return; + } + + virtual ~hash_ctr_rng() {} + + uint64_t get_bits( uint8_t count ) + { + uint64_t result = 0; + uint64_t mask = 1; + // grab the requested number of bits + while( count > 0 ) + { + result |= + ( + ( + ( + _current_value.data()[ (_current_offset >> 3) & 0x1F ] + & ( 1 << (_current_offset & 0x07) ) + ) + != 0 + ) ? mask : 0 + ); + mask += mask; + --count; + ++_current_offset; + if( _current_offset == (_current_value.data_size() << 3) ) + { + _counter++; + _current_offset = 0; + _reset_current_value(); + } + } + return result; + } + + uint64_t operator()( uint64_t bound ) + { + if( bound <= 1 ) + return 0; + uint8_t bitcount = boost::multiprecision::detail::find_msb( bound ) + 1; + + // probability of loop exiting is >= 1/2, so probability of + // running N times is bounded above by (1/2)^N + while( true ) + { + uint64_t result = get_bits( bitcount ); + if( result < bound ) + return result; + } + } + + // convenience method which does casting for types other than uint64_t + template< typename T > T operator()( T bound ) + { return (T) ( (*this)(uint64_t( bound )) ); } + + void _reset_current_value() + { + // internal implementation detail, called to update + // _current_value when _counter changes + typename HashClass::encoder enc; + enc.write( _seed , SeedLength ); + enc.write( (char *) &_counter, 8 ); + _current_value = enc.result(); + return; + } + + uint64_t _counter; + char _seed[ SeedLength ]; + HashClass _current_value; + uint16_t _current_offset; + + static const int seed_length = SeedLength; +}; + +} // end namespace fc diff --git a/include/fc/network/ntp.hpp b/include/fc/network/ntp.hpp new file mode 100644 index 0000000..6067b3c --- /dev/null +++ b/include/fc/network/ntp.hpp @@ -0,0 +1,27 @@ +#pragma once +#include +#include +#include +#include + + +namespace fc { + + namespace detail { class ntp_impl; } + + class ntp + { + public: + ntp(); + ~ntp(); + + void add_server( const std::string& hostname, uint16_t port = 123 ); + void set_request_interval( uint32_t interval_sec ); + void request_now(); + optional get_time()const; + + private: + std::unique_ptr my; + }; + +} // namespace fc diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp new file mode 100644 index 0000000..5c0a085 --- /dev/null +++ b/src/network/ntp.cpp @@ -0,0 +1,272 @@ +#include +#include +#include +#include +#include + +#include +#include "../byteswap.hpp" + +#include +#include + +namespace fc +{ + namespace detail { + + class ntp_impl + { + public: + /** vector < host, port > */ + fc::thread _ntp_thread; + std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; + fc::future _read_loop_done; + udp_socket _sock; + uint32_t _request_interval_sec; + uint32_t _retry_failed_request_interval_sec; + fc::time_point _last_valid_ntp_reply_received_time; + + std::atomic_bool _last_ntp_delta_initialized; + std::atomic _last_ntp_delta_microseconds; + + + fc::future _request_time_task_done; + + ntp_impl() : + _ntp_thread("ntp"), + _request_interval_sec( 60*60 /* 1 hr */), + _retry_failed_request_interval_sec(60 * 5), + _last_ntp_delta_microseconds(0) + { + _last_ntp_delta_initialized = false; + _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); + } + + ~ntp_impl() + { + } + + fc::time_point ntp_timestamp_to_fc_time_point(uint64_t ntp_timestamp_net_order) + { + uint64_t ntp_timestamp_host = bswap_64(ntp_timestamp_net_order); + uint32_t fractional_seconds = ntp_timestamp_host & 0xffffffff; + uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (uint64_t(1) << 31)) >> 32); + uint32_t seconds_since_1900 = ntp_timestamp_host >> 32; + uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; + return fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds); + } + + uint64_t fc_time_point_to_ntp_timestamp(const fc::time_point& fc_timestamp) + { + uint64_t microseconds_since_epoch = (uint64_t)fc_timestamp.time_since_epoch().count(); + uint32_t seconds_since_epoch = (uint32_t)(microseconds_since_epoch / 1000000); + uint32_t seconds_since_1900 = seconds_since_epoch + 2208988800; + uint32_t microseconds = microseconds_since_epoch % 1000000; + uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (uint64_t(1) << 31)) / 1000000); + uint64_t ntp_timestamp_net_order = ((uint64_t)seconds_since_1900 << 32) + fractional_seconds; + return bswap_64(ntp_timestamp_net_order); + } + + void request_now() + { + assert(_ntp_thread.is_current()); + for( auto item : _ntp_hosts ) + { + try + { + //wlog( "resolving... ${r}", ("r", item) ); + auto eps = resolve( item.first, item.second ); + for( auto ep : eps ) + { + // wlog( "sending request to ${ep}", ("ep",ep) ); + std::shared_ptr send_buffer(new char[48], [](char* p){ delete[] p; }); + std::array packet_to_send { {010,0,0,0,0,0,0,0,0} }; + memcpy(send_buffer.get(), packet_to_send.data(), packet_to_send.size()); + uint64_t* send_buf_as_64_array = (uint64_t*)send_buffer.get(); + send_buf_as_64_array[5] = fc_time_point_to_ntp_timestamp(fc::time_point::now()); // 5 = Transmit Timestamp + _sock.send_to(send_buffer, packet_to_send.size(), ep); + break; + } + } + catch (const fc::canceled_exception&) + { + throw; + } + // this could fail to resolve but we want to go on to other hosts.. + catch ( const fc::exception& e ) + { + elog( "${e}", ("e",e.to_detail_string() ) ); + } + } + } // request_now + + // started for first time in ntp() constructor, canceled in ~ntp() destructor + // this task gets invoked every _retry_failed_request_interval_sec (currently 5 min), and if + // _request_interval_sec (currently 1 hour) has passed since the last successful update, + // it sends a new request + void request_time_task() + { + assert(_ntp_thread.is_current()); + if (_last_valid_ntp_reply_received_time <= fc::time_point::now() - fc::seconds(_request_interval_sec - 5)) + request_now(); + if (!_request_time_task_done.valid() || !_request_time_task_done.canceled()) + _request_time_task_done = schedule( [=](){ request_time_task(); }, + fc::time_point::now() + fc::seconds(_retry_failed_request_interval_sec), + "request_time_task" ); + } // request_loop + + void start_read_loop() + { + _read_loop_done = _ntp_thread.async( [this](){ read_loop(); }, "ntp_read_loop" ); + } + + void read_loop() + { + assert(_ntp_thread.is_current()); + + uint32_t receive_buffer_size = sizeof(uint64_t) * 1024; + std::shared_ptr receive_buffer(new char[receive_buffer_size], [](char* p){ delete[] p; }); + uint64_t* recv_buf = (uint64_t*)receive_buffer.get(); + + //outer while to restart read-loop if exception is thrown while waiting to receive on socket. + while( !_read_loop_done.canceled() ) + { + // if you start the read while loop here, the recieve_from call will throw "invalid argument" on win32, + // so instead we start the loop after making our first request + try + { + _sock.open(); + request_time_task(); //this will re-send a time request + + while( !_read_loop_done.canceled() ) + { + fc::ip::endpoint from; + try + { + _sock.receive_from( receive_buffer, receive_buffer_size, from ); + // wlog("received ntp reply from ${from}",("from",from) ); + } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket"); + + fc::time_point receive_time = fc::time_point::now(); + fc::time_point origin_time = ntp_timestamp_to_fc_time_point(recv_buf[3]); + fc::time_point server_receive_time = ntp_timestamp_to_fc_time_point(recv_buf[4]); + fc::time_point server_transmit_time = ntp_timestamp_to_fc_time_point(recv_buf[5]); + + fc::microseconds offset(((server_receive_time - origin_time) + + (server_transmit_time - receive_time)).count() / 2); + fc::microseconds round_trip_delay((receive_time - origin_time) - + (server_transmit_time - server_receive_time)); + //wlog("origin_time = ${origin_time}, server_receive_time = ${server_receive_time}, server_transmit_time = ${server_transmit_time}, receive_time = ${receive_time}", + // ("origin_time", origin_time)("server_receive_time", server_receive_time)("server_transmit_time", server_transmit_time)("receive_time", receive_time)); + // wlog("ntp offset: ${offset}, round_trip_delay ${delay}", ("offset", offset)("delay", round_trip_delay)); + + //if the reply we just received has occurred more than a second after our last time request (it was more than a second ago since our last request) + if( round_trip_delay > fc::microseconds(300000) ) + { + wlog("received stale ntp reply requested at ${request_time}, send a new time request", ("request_time", origin_time)); + request_now(); //request another reply and ignore this one + } + else //we think we have a timely reply, process it + { + if( offset < fc::seconds(60*60*24) && offset > fc::seconds(-60*60*24) ) + { + _last_ntp_delta_microseconds = offset.count(); + _last_ntp_delta_initialized = true; + fc::microseconds ntp_delta_time = fc::microseconds(_last_ntp_delta_microseconds); + _last_valid_ntp_reply_received_time = receive_time; + wlog("ntp_delta_time updated to ${delta_time} us", ("delta_time",ntp_delta_time) ); + } + else + elog( "NTP time and local time vary by more than a day! ntp:${ntp_time} local:${local}", + ("ntp_time", receive_time + offset)("local", fc::time_point::now()) ); + } + } + } // try + catch (fc::canceled_exception) + { + throw; + } + catch (const fc::exception& e) + { + //swallow any other exception and restart loop + elog("exception in read_loop, going to restart it. ${e}",("e",e)); + } + catch (...) + { + //swallow any other exception and restart loop + elog("unknown exception in read_loop, going to restart it."); + } + _sock.close(); + fc::usleep(fc::seconds(_retry_failed_request_interval_sec)); + } //outer while loop + wlog("exiting ntp read_loop"); + } //end read_loop() + }; //ntp_impl + + } // namespace detail + + + + ntp::ntp() + :my( new detail::ntp_impl() ) + { + my->start_read_loop(); + } + + ntp::~ntp() + { + my->_ntp_thread.async([=](){ + try + { + my->_request_time_task_done.cancel_and_wait("ntp object is destructing"); + } + catch ( const fc::exception& e ) + { + wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring: ${e}", ("e",e) ); + } + catch (...) + { + wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring" ); + } + + try + { + my->_read_loop_done.cancel_and_wait("ntp object is destructing"); + } + catch ( const fc::exception& e ) + { + wlog( "Exception thrown while shutting down NTP's read_loop, ignoring: ${e}", ("e",e) ); + } + catch (...) + { + wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" ); + } + + }, "ntp_shutdown_task").wait(); + } + + + void ntp::add_server( const std::string& hostname, uint16_t port) + { + my->_ntp_thread.async( [=](){ my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); }, "add_server" ).wait(); + } + + void ntp::set_request_interval( uint32_t interval_sec ) + { + my->_request_interval_sec = interval_sec; + my->_retry_failed_request_interval_sec = std::min(my->_retry_failed_request_interval_sec, interval_sec); + } + + void ntp::request_now() + { + my->_ntp_thread.async( [=](){ my->request_now(); }, "request_now" ).wait(); + } + + optional ntp::get_time()const + { + if( my->_last_ntp_delta_initialized ) + return fc::time_point::now() + fc::microseconds(my->_last_ntp_delta_microseconds); + return optional(); + } + +} //namespace fc