fix potential errors in NTP caused by running in main thread

This commit is contained in:
Daniel Larimer 2014-06-25 23:32:09 -04:00
parent d20b9d049b
commit 456c81df27
4 changed files with 133 additions and 35 deletions

View file

@ -151,6 +151,7 @@ set( fc_sources
# src/crypto/romix.cpp # src/crypto/romix.cpp
src/network/tcp_socket.cpp src/network/tcp_socket.cpp
src/network/udp_socket.cpp src/network/udp_socket.cpp
src/network/udt_socket.cpp
src/network/http/http_connection.cpp src/network/http/http_connection.cpp
src/network/http/http_server.cpp src/network/http/http_server.cpp
src/network/ntp.cpp src/network/ntp.cpp

View file

@ -16,6 +16,7 @@ int main( int argc, char** argv )
auto hours = delta.count() / 1000000 / 60 / 60; auto hours = delta.count() / 1000000 / 60 / 60;
auto seconds = delta.count() / 1000000; auto seconds = delta.count() / 1000000;
auto msec= delta.count() / 1000; auto msec= delta.count() / 1000;
idump( (fc::time_point::now() ) );
idump( (ntp_time)(delta)(msec)(seconds)(minutes)(hours) ); idump( (ntp_time)(delta)(msec)(seconds)(minutes)(hours) );
} }
else else

View file

@ -16,20 +16,19 @@ namespace fc
class ntp_impl class ntp_impl
{ {
public: public:
ntp_impl() :_request_interval_sec( 60*60 /* 1 hr */) ntp_impl():_request_interval_sec( 60*60 /* 1 hr */),_ntp_thread("ntp")
{ {
_next_request_time = fc::time_point::now();
_ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) );
} }
/** vector < host, port > */ /** vector < host, port > */
std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; std::vector< std::pair< std::string, uint16_t> > _ntp_hosts;
fc::future<void> _request_loop;
fc::future<void> _read_loop; fc::future<void> _read_loop;
udp_socket _sock; udp_socket _sock;
uint32_t _request_interval_sec; uint32_t _request_interval_sec;
fc::time_point _next_request_time; fc::time_point _last_request_time;
optional<fc::microseconds> _last_ntp_delta; optional<fc::microseconds> _last_ntp_delta;
fc::thread _ntp_thread;
void request_now() void request_now()
{ {
@ -43,6 +42,7 @@ namespace fc
{ {
ilog( "sending request to ${ep}", ("ep",ep) ); ilog( "sending request to ${ep}", ("ep",ep) );
std::array<unsigned char, 48> send_buf { {010,0,0,0,0,0,0,0,0} }; std::array<unsigned char, 48> send_buf { {010,0,0,0,0,0,0,0,0} };
_last_request_time = fc::time_point::now();
_sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep ); _sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep );
break; break;
} }
@ -55,17 +55,10 @@ namespace fc
} }
} // request_now } // request_now
void request_loop() void request_time()
{ {
while( !_request_loop.canceled() ) request_now();
{ _ntp_thread.schedule( [=](){ request_time(); }, fc::time_point::now() + fc::seconds(_request_interval_sec) );
if( _next_request_time < fc::time_point::now() )
{
_next_request_time += fc::seconds( _request_interval_sec );
request_now();
}
fc::usleep( fc::seconds(1) ); // TODO: fix FC timers..
} // while
} // request_loop } // request_loop
void read_loop() void read_loop()
@ -86,14 +79,19 @@ namespace fc
uint32_t seconds_since_1900 = receive_timestamp_host >> 32; uint32_t seconds_since_1900 = receive_timestamp_host >> 32;
uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800;
auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds)); if( fc::time_point::now() - _last_request_time > fc::seconds(1) )
if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && request_now();
fc::time_point::now() - ntp_time < fc::seconds(60*60*24) )
{
_last_ntp_delta = ntp_time - fc::time_point::now();
}
else else
elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); {
auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds));
if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) &&
fc::time_point::now() - ntp_time < fc::seconds(60*60*24) )
{
_last_ntp_delta = ntp_time - fc::time_point::now();
}
else
elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) );
}
} }
} // read_loop } // read_loop
}; };
@ -108,17 +106,15 @@ namespace fc
{ {
my->_sock.open(); my->_sock.open();
my->_request_loop = fc::async( [=](){ my->request_loop(); } ); my->_ntp_thread.async( [=](){ my->request_time(); } );
my->_read_loop = fc::async( [=](){ my->read_loop(); } ); my->_read_loop = my->_ntp_thread.async( [=](){ my->read_loop(); } );
} }
ntp::~ntp() ntp::~ntp()
{ {
try { try {
my->_request_loop.cancel();
my->_read_loop.cancel(); my->_read_loop.cancel();
my->_sock.close(); my->_sock.close();
my->_request_loop.wait();
my->_read_loop.wait(); my->_read_loop.wait();
} }
catch ( const fc::exception& ) catch ( const fc::exception& )
@ -137,8 +133,8 @@ namespace fc
void ntp::set_request_interval( uint32_t interval_sec ) void ntp::set_request_interval( uint32_t interval_sec )
{ {
my->_request_interval_sec = interval_sec; my->_request_interval_sec = interval_sec;
my->_next_request_time = fc::time_point::now();
} }
void ntp::request_now() void ntp::request_now()
{ {
my->request_now(); my->request_now();

View file

@ -1,6 +1,106 @@
#include <fc/network/udt_socket.hpp> #include <fc/network/udt_socket.hpp>
#include <fc/thread/thread.hpp>
#include <fc/thread/mutex.hpp>
#include <fc/thread/unique_lock.hpp>
#include <fc/network/ip.hpp>
#include <udt.h>
#include <arpa/inet.h>
namespace fc { namespace fc {
class udt_epoll_service
{
public:
udt_epoll_service()
:_epoll_thread("udt_epoll")
{
_epoll_id = UDT::epoll_create();
_epoll_loop = _epoll_thread.async( [=](){ poll_loop(); } );
}
~udt_epoll_service()
{
_epoll_loop.cancel();
}
void poll_loop()
{
while( !_epoll_loop.canceled() )
{
std::set<UDTSOCKET> read_ready;
std::set<UDTSOCKET> write_ready;
UDT::epoll_wait( _epoll_id,
&read_ready,
&write_ready, 1000 );
{ synchronized(_read_promises_mutex)
for( auto sock : read_ready )
{
auto itr = _read_promises.find( sock );
if( itr != _read_promises.end() )
{
itr->second->set_value();
_read_promises.erase(itr);
}
}
} // synchronized read promise mutex
{ synchronized(_write_promises_mutex)
for( auto sock : write_ready )
{
auto itr = _write_promises.find( sock );
if( itr != _write_promises.end() )
{
itr->second->set_value();
_write_promises.erase(itr);
}
}
} // synchronized write promise mutex
} // while not canceled
} // poll_loop
void notify_read( int udt_socket_id,
const promise<void>::ptr& p )
{
int events = UDT_EPOLL_IN;
UDT::epoll_add_usock( _epoll_id,
udt_socket_id,
&events );
{ synchronized(_read_promises_mutex)
_read_promises[udt_socket_id] = p;
}
}
void notify_write( int udt_socket_id,
const promise<void>::ptr& p )
{
int events = UDT_EPOLL_OUT;
UDT::epoll_add_usock( _epoll_id,
udt_socket_id,
&events );
{ synchronized(_write_promises_mutex)
_write_promises[udt_socket_id] = p;
}
}
private:
fc::mutex _read_promises_mutex;
fc::mutex _write_promises_mutex;
std::unordered_map<int, promise<void>::ptr > _read_promises;
std::unordered_map<int, promise<void>::ptr > _write_promises;
fc::future<void> _epoll_loop;
fc::thread _epoll_thread;
int _epoll_id;
};
void check_udt_errors() void check_udt_errors()
{ {
@ -18,7 +118,7 @@ namespace fc {
{ {
} }
~udt_socket() udt_socket::~udt_socket()
{ {
close(); close();
} }
@ -28,7 +128,7 @@ namespace fc {
sockaddr_in serv_addr; sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(remote_endpoint.port()); serv_addr.sin_port = htons(remote_endpoint.port());
serv_addr.sin_addr = htonl(remote_endpoint.address()); serv_addr.sin_addr.s_addr = htonl(remote_endpoint.get_address());
// connect to the server, implict bind // connect to the server, implict bind
if (UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr))) if (UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)))
@ -40,20 +140,20 @@ namespace fc {
{ try { { try {
sockaddr_in peer_addr; sockaddr_in peer_addr;
int peer_addr_size = sizeof(peer_addr); int peer_addr_size = sizeof(peer_addr);
int error_code = UDT::getpeername( _udt_socket_id, &peer_addr, &peer_addr_size ); int error_code = UDT::getpeername( _udt_socket_id, (struct sockaddr*)&peer_addr, &peer_addr_size );
if( error_code == UDT::ERROR ) if( error_code == UDT::ERROR )
check_udt_errors(); check_udt_errors();
return ip::endpoint( address( htonl( peer_addr.sin_addr ) ), htons(peer_addr.sin_port) ); return ip::endpoint( ip::address( htonl( peer_addr.sin_addr.s_addr ) ), htons(peer_addr.sin_port) );
} FC_CAPTURE_AND_RETHROW() } } FC_CAPTURE_AND_RETHROW() }
ip::endpoint udt_socket::local_endpoint() const ip::endpoint udt_socket::local_endpoint() const
{ try { { try {
sockaddr_in sock_addr; sockaddr_in sock_addr;
int addr_size = sizeof(peer_addr); int addr_size = sizeof(sock_addr);
int error_code = UDT::getsockname( _udt_socket_id, &sock_addr, &addr_size ); int error_code = UDT::getsockname( _udt_socket_id, (struct sockaddr*)&sock_addr, &addr_size );
if( error_code == UDT::ERROR ) if( error_code == UDT::ERROR )
check_udt_errors(); check_udt_errors();
return ip::endpoint( address( htonl( sock_addr.sin_addr ) ), htons(sock_addr.sin_port) ); return ip::endpoint( ip::address( htonl( sock_addr.sin_addr.s_addr ) ), htons(sock_addr.sin_port) );
} FC_CAPTURE_AND_RETHROW() } } FC_CAPTURE_AND_RETHROW() }
@ -63,7 +163,7 @@ namespace fc {
auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 ); auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 );
if( bytes_read == UDT::ERROR ) if( bytes_read == UDT::ERROR )
{ {
if( UDT::getlasterror().getCode() == UDT::EASYNCRCV ) if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV )
{ {
// create a future and post to epoll, wait on it, then // create a future and post to epoll, wait on it, then
// call readsome recursively. // call readsome recursively.
@ -85,7 +185,7 @@ namespace fc {
/// @{ /// @{
size_t udt_socket::writesome( const char* buffer, size_t len ) size_t udt_socket::writesome( const char* buffer, size_t len )
{ {
auto bytes_sent = UDT::send(_udt_socket_idl, buffer, len, 0); auto bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0);
if( UDT::ERROR == bytes_sent ) if( UDT::ERROR == bytes_sent )
check_udt_errors(); check_udt_errors();