fix udt sockets, actually enable non blocking operation

This commit is contained in:
Daniel Larimer 2014-06-29 01:38:46 -04:00
parent 7cf3526c54
commit c33acad0ab
4 changed files with 108 additions and 42 deletions

View file

@ -184,14 +184,15 @@ namespace fc {
/// @post ready() /// @post ready()
/// @throws timeout /// @throws timeout
const T& wait( const microseconds& timeout = microseconds::maximum() )const { const T& wait( const microseconds& timeout = microseconds::maximum() )const {
return m_prom->wait(timeout); return m_prom->wait(timeout);
} }
/// @pre valid() /// @pre valid()
/// @post ready() /// @post ready()
/// @throws timeout /// @throws timeout
const T& wait_until( const time_point& tp )const { const T& wait_until( const time_point& tp )const {
return m_prom->wait_until(tp); if( m_prom )
return m_prom->wait_until(tp);
} }
bool valid()const { return !!m_prom; } bool valid()const { return !!m_prom; }
@ -205,6 +206,15 @@ namespace fc {
void cancel()const { if( m_prom ) m_prom->cancel(); } void cancel()const { if( m_prom ) m_prom->cancel(); }
bool canceled()const { return m_prom->canceled(); } bool canceled()const { return m_prom->canceled(); }
void cancel_and_wait()
{
if( valid() )
{
cancel();
wait();
}
}
/** /**
* @pre valid() * @pre valid()
* *
@ -239,7 +249,8 @@ namespace fc {
/// @post ready() /// @post ready()
/// @throws timeout /// @throws timeout
void wait( const microseconds& timeout = microseconds::maximum() ){ void wait( const microseconds& timeout = microseconds::maximum() ){
m_prom->wait(timeout); if( m_prom )
m_prom->wait(timeout);
} }
/// @pre valid() /// @pre valid()
@ -252,6 +263,12 @@ namespace fc {
bool valid()const { return !!m_prom; } bool valid()const { return !!m_prom; }
bool canceled()const { return m_prom->canceled(); } bool canceled()const { return m_prom->canceled(); }
void cancel_and_wait()
{
cancel();
wait();
}
/// @pre valid() /// @pre valid()
bool ready()const { return m_prom->ready(); } bool ready()const { return m_prom->ready(); }

View file

@ -43,14 +43,13 @@ namespace fc {
void poll_loop() void poll_loop()
{ {
std::set<UDTSOCKET> read_ready;
std::set<UDTSOCKET> write_ready;
while( !_epoll_loop.canceled() ) while( !_epoll_loop.canceled() )
{ {
std::set<UDTSOCKET> read_ready;
std::set<UDTSOCKET> write_ready;
UDT::epoll_wait( _epoll_id, UDT::epoll_wait( _epoll_id,
&read_ready, &read_ready,
&write_ready, 1000*1000 ); &write_ready, 100000000 );
{ synchronized(_read_promises_mutex) { synchronized(_read_promises_mutex)
for( auto sock : read_ready ) for( auto sock : read_ready )
@ -81,14 +80,15 @@ namespace fc {
void notify_read( int udt_socket_id, void notify_read( int udt_socket_id,
const promise<void>::ptr& p ) const promise<void>::ptr& p )
{ {
int events = UDT_EPOLL_IN; int events = UDT_EPOLL_IN | UDT_EPOLL_ERR;
UDT::epoll_add_usock( _epoll_id, if( 0 != UDT::epoll_add_usock( _epoll_id,
udt_socket_id, udt_socket_id,
&events ); &events ) )
{
check_udt_errors();
}
{ synchronized(_read_promises_mutex) { synchronized(_read_promises_mutex)
_read_promises[udt_socket_id] = p; _read_promises[udt_socket_id] = p;
} }
} }
@ -96,15 +96,22 @@ namespace fc {
void notify_write( int udt_socket_id, void notify_write( int udt_socket_id,
const promise<void>::ptr& p ) const promise<void>::ptr& p )
{ {
int events = UDT_EPOLL_OUT; int events = UDT_EPOLL_OUT | UDT_EPOLL_ERR;
UDT::epoll_add_usock( _epoll_id, if( 0 != UDT::epoll_add_usock( _epoll_id,
udt_socket_id, udt_socket_id,
&events ); &events ) )
{
check_udt_errors();
}
{ synchronized(_write_promises_mutex) { synchronized(_write_promises_mutex)
_write_promises[udt_socket_id] = p; _write_promises[udt_socket_id] = p;
} }
} }
void remove( int udt_socket_id )
{
UDT::epoll_remove_usock( _epoll_id, udt_socket_id );
}
private: private:
fc::mutex _read_promises_mutex; fc::mutex _read_promises_mutex;
@ -165,9 +172,17 @@ namespace fc {
serv_addr.sin_port = htons(remote_endpoint.port()); serv_addr.sin_port = htons(remote_endpoint.port());
serv_addr.sin_addr.s_addr = htonl(remote_endpoint.get_address()); serv_addr.sin_addr.s_addr = htonl(remote_endpoint.get_address());
// connect to the server, implict bind // UDT doesn't allow now blocking connects...
if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) ) fc::thread connect_thread("connect_thread");
check_udt_errors(); connect_thread.async( [&](){
if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) )
check_udt_errors();
}).wait();
bool block = false;
UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool));
UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool));
check_udt_errors();
} FC_CAPTURE_AND_RETHROW( (remote_endpoint) ) } } FC_CAPTURE_AND_RETHROW( (remote_endpoint) ) }
@ -196,7 +211,7 @@ namespace fc {
size_t udt_socket::readsome( char* buffer, size_t max ) size_t udt_socket::readsome( char* buffer, size_t max )
{ try { { try {
auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 ); auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 );
if( bytes_read == UDT::ERROR ) while( bytes_read == UDT::ERROR )
{ {
if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV ) if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV )
{ {
@ -204,7 +219,7 @@ namespace fc {
promise<void>::ptr p(new promise<void>("udt_socket::readsome")); promise<void>::ptr p(new promise<void>("udt_socket::readsome"));
default_epool_service().notify_read( _udt_socket_id, p ); default_epool_service().notify_read( _udt_socket_id, p );
p->wait(); p->wait();
return readsome( buffer, max ); bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 );
} }
else else
check_udt_errors(); check_udt_errors();
@ -222,42 +237,46 @@ namespace fc {
/// ostream interface /// ostream interface
/// @{ /// @{
size_t udt_socket::writesome( const char* buffer, size_t len ) size_t udt_socket::writesome( const char* buffer, size_t len )
{ { try {
auto bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0); auto bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0);
if( UDT::ERROR == bytes_sent ) while( UDT::ERROR == bytes_sent )
{ {
if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV ) if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCSND )
{ {
UDT::getlasterror().clear(); UDT::getlasterror().clear();
promise<void>::ptr p(new promise<void>("udt_socket::writesome")); promise<void>::ptr p(new promise<void>("udt_socket::writesome"));
default_epool_service().notify_write( _udt_socket_id, p ); default_epool_service().notify_write( _udt_socket_id, p );
p->wait(); p->wait();
return writesome( buffer, len ); bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0);
continue;
} }
else else
check_udt_errors(); check_udt_errors();
} }
if( bytes_sent == 0 )
{
// schedule wait with epoll
}
return bytes_sent; return bytes_sent;
} } FC_CAPTURE_AND_RETHROW( (len) ) }
void udt_socket::flush(){} void udt_socket::flush(){}
void udt_socket::close() void udt_socket::close()
{ try { { try {
UDT::close( _udt_socket_id ); if( is_open() )
check_udt_errors(); {
default_epool_service().remove( _udt_socket_id );
UDT::close( _udt_socket_id );
check_udt_errors();
_udt_socket_id = UDT::INVALID_SOCK;
}
} FC_CAPTURE_AND_RETHROW() } } FC_CAPTURE_AND_RETHROW() }
/// @} /// @}
void udt_socket::open() void udt_socket::open()
{ {
_udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0); _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0);
if( _udt_socket_id == UDT::INVALID_SOCK )
check_udt_errors();
} }
bool udt_socket::is_open()const bool udt_socket::is_open()const
@ -274,9 +293,14 @@ namespace fc {
:_udt_socket_id( UDT::INVALID_SOCK ) :_udt_socket_id( UDT::INVALID_SOCK )
{ {
_udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0); _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0);
if( _udt_socket_id == UDT::INVALID_SOCK )
check_udt_errors();
bool block = false; bool block = false;
UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool)); UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool));
check_udt_errors();
UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool)); UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool));
check_udt_errors();
} }
udt_server::~udt_server() udt_server::~udt_server()
@ -291,8 +315,13 @@ namespace fc {
void udt_server::close() void udt_server::close()
{ try { { try {
UDT::close( _udt_socket_id ); if( _udt_socket_id != UDT::INVALID_SOCK )
check_udt_errors(); {
default_epool_service().remove( _udt_socket_id );
UDT::close( _udt_socket_id );
check_udt_errors();
_udt_socket_id = UDT::INVALID_SOCK;
}
} FC_CAPTURE_AND_RETHROW() } } FC_CAPTURE_AND_RETHROW() }
void udt_server::accept( udt_socket& s ) void udt_server::accept( udt_socket& s )

View file

@ -1,6 +1,7 @@
#include <fc/network/udt_socket.hpp> #include <fc/network/udt_socket.hpp>
#include <fc/network/ip.hpp> #include <fc/network/ip.hpp>
#include <fc/exception/exception.hpp> #include <fc/exception/exception.hpp>
#include <fc/thread/thread.hpp>
#include <iostream> #include <iostream>
#include <vector> #include <vector>
@ -11,19 +12,34 @@ int main( int argc, char** argv )
try { try {
udt_socket sock; udt_socket sock;
sock.bind( fc::ip::endpoint::from_string( "127.0.0.1:6666" ) ); sock.bind( fc::ip::endpoint::from_string( "127.0.0.1:6666" ) );
ilog( "." );
sock.connect_to( fc::ip::endpoint::from_string( "127.0.0.1:7777" ) ); sock.connect_to( fc::ip::endpoint::from_string( "127.0.0.1:7777" ) );
ilog( "after connect to..." );
std::cout << "local endpoint: " <<std::string( sock.local_endpoint() ) <<"\n"; std::cout << "local endpoint: " <<std::string( sock.local_endpoint() ) <<"\n";
std::cout << "remote endpoint: " <<std::string( sock.remote_endpoint() ) <<"\n"; std::cout << "remote endpoint: " <<std::string( sock.remote_endpoint() ) <<"\n";
std::string hello = "hello world"; std::string hello = "hello world\n";
sock.write( hello.c_str(), hello.size() ); for( uint32_t i = 0; i < 1000000; ++i )
{
sock.write( hello.c_str(), hello.size() );
}
ilog( "closing" );
sock.close();
usleep( fc::seconds(1) );
/*
std::vector<char> response; std::vector<char> response;
response.resize(1024); response.resize(1024);
int r = sock.readsome( response.data(), response.size() ); int r = sock.readsome( response.data(), response.size() );
while( r )
std::cout << "response: '"<<std::string( response.data(), r ) <<"'\n"; {
std::cout.write( response.data(), r );
r = sock.readsome( response.data(), response.size() );
}
*/
// if we exit too quickly, UDT will not have a chance to
// send the graceful close message.
//fc::usleep( fc::seconds(1) );
} catch ( const fc::exception& e ) } catch ( const fc::exception& e )
{ {
elog( "${e}", ("e",e.to_detail_string() ) ); elog( "${e}", ("e",e.to_detail_string() ) );

View file

@ -20,8 +20,12 @@ int main( int argc, char** argv )
std::vector<char> response; std::vector<char> response;
response.resize(1024); response.resize(1024);
int r = sock.readsome( response.data(), response.size() ); int r = sock.readsome( response.data(), response.size() );
while( r )
std::cout << "request: '"<<std::string( response.data(), r ) <<"' from " << std::string( sock.remote_endpoint() ) <<"\n"; {
std::cout.write( response.data(), r );
r = sock.readsome( response.data(), response.size() );
//sock.write( response.data(), response.size() );
}
std::string goodbye = "goodbye cruel world"; std::string goodbye = "goodbye cruel world";
sock.write( goodbye.c_str(), goodbye.size() ); sock.write( goodbye.c_str(), goodbye.size() );