From c33acad0abecc144127d8fa1ab8867962ff69e80 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Sun, 29 Jun 2014 01:38:46 -0400 Subject: [PATCH] fix udt sockets, actually enable non blocking operation --- include/fc/thread/future.hpp | 23 +++++++-- src/network/udt_socket.cpp | 93 +++++++++++++++++++++++------------- tests/udtc.cpp | 26 ++++++++-- tests/udts.cpp | 8 +++- 4 files changed, 108 insertions(+), 42 deletions(-) diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index cf57a24..8ed6830 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -184,14 +184,15 @@ namespace fc { /// @post ready() /// @throws timeout const T& wait( const microseconds& timeout = microseconds::maximum() )const { - return m_prom->wait(timeout); + return m_prom->wait(timeout); } /// @pre valid() /// @post ready() /// @throws timeout const T& wait_until( const time_point& tp )const { - return m_prom->wait_until(tp); + if( m_prom ) + return m_prom->wait_until(tp); } bool valid()const { return !!m_prom; } @@ -205,6 +206,15 @@ namespace fc { void cancel()const { if( m_prom ) m_prom->cancel(); } bool canceled()const { return m_prom->canceled(); } + void cancel_and_wait() + { + if( valid() ) + { + cancel(); + wait(); + } + } + /** * @pre valid() * @@ -239,7 +249,8 @@ namespace fc { /// @post ready() /// @throws timeout void wait( const microseconds& timeout = microseconds::maximum() ){ - m_prom->wait(timeout); + if( m_prom ) + m_prom->wait(timeout); } /// @pre valid() @@ -252,6 +263,12 @@ namespace fc { bool valid()const { return !!m_prom; } bool canceled()const { return m_prom->canceled(); } + void cancel_and_wait() + { + cancel(); + wait(); + } + /// @pre valid() bool ready()const { return m_prom->ready(); } diff --git a/src/network/udt_socket.cpp b/src/network/udt_socket.cpp index ba75ae8..7b91afa 100644 --- a/src/network/udt_socket.cpp +++ b/src/network/udt_socket.cpp @@ -43,14 +43,13 @@ namespace fc { void poll_loop() { + std::set read_ready; + std::set write_ready; while( !_epoll_loop.canceled() ) { - std::set read_ready; - std::set write_ready; - UDT::epoll_wait( _epoll_id, &read_ready, - &write_ready, 1000*1000 ); + &write_ready, 100000000 ); { synchronized(_read_promises_mutex) for( auto sock : read_ready ) @@ -81,14 +80,15 @@ namespace fc { void notify_read( int udt_socket_id, const promise::ptr& p ) { - int events = UDT_EPOLL_IN; - UDT::epoll_add_usock( _epoll_id, - udt_socket_id, - &events ); - + int events = UDT_EPOLL_IN | UDT_EPOLL_ERR; + if( 0 != UDT::epoll_add_usock( _epoll_id, + udt_socket_id, + &events ) ) + { + check_udt_errors(); + } { synchronized(_read_promises_mutex) - _read_promises[udt_socket_id] = p; } } @@ -96,15 +96,22 @@ namespace fc { void notify_write( int udt_socket_id, const promise::ptr& p ) { - int events = UDT_EPOLL_OUT; - UDT::epoll_add_usock( _epoll_id, + int events = UDT_EPOLL_OUT | UDT_EPOLL_ERR; + if( 0 != UDT::epoll_add_usock( _epoll_id, udt_socket_id, - &events ); + &events ) ) + { + check_udt_errors(); + } { synchronized(_write_promises_mutex) _write_promises[udt_socket_id] = p; } } + void remove( int udt_socket_id ) + { + UDT::epoll_remove_usock( _epoll_id, udt_socket_id ); + } private: fc::mutex _read_promises_mutex; @@ -165,9 +172,17 @@ namespace fc { serv_addr.sin_port = htons(remote_endpoint.port()); serv_addr.sin_addr.s_addr = htonl(remote_endpoint.get_address()); - // connect to the server, implict bind - if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) ) - check_udt_errors(); + // UDT doesn't allow now blocking connects... + fc::thread connect_thread("connect_thread"); + connect_thread.async( [&](){ + if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) ) + check_udt_errors(); + }).wait(); + + bool block = false; + UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool)); + UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool)); + check_udt_errors(); } FC_CAPTURE_AND_RETHROW( (remote_endpoint) ) } @@ -196,7 +211,7 @@ namespace fc { size_t udt_socket::readsome( char* buffer, size_t max ) { try { auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 ); - if( bytes_read == UDT::ERROR ) + while( bytes_read == UDT::ERROR ) { if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV ) { @@ -204,7 +219,7 @@ namespace fc { promise::ptr p(new promise("udt_socket::readsome")); default_epool_service().notify_read( _udt_socket_id, p ); p->wait(); - return readsome( buffer, max ); + bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 ); } else check_udt_errors(); @@ -222,42 +237,46 @@ namespace fc { /// ostream interface /// @{ size_t udt_socket::writesome( const char* buffer, size_t len ) - { + { try { auto bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0); - if( UDT::ERROR == bytes_sent ) + while( UDT::ERROR == bytes_sent ) { - if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV ) + if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCSND ) { UDT::getlasterror().clear(); promise::ptr p(new promise("udt_socket::writesome")); default_epool_service().notify_write( _udt_socket_id, p ); p->wait(); - return writesome( buffer, len ); + bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0); + continue; } else check_udt_errors(); } - - if( bytes_sent == 0 ) - { - // schedule wait with epoll - } return bytes_sent; - } + } FC_CAPTURE_AND_RETHROW( (len) ) } void udt_socket::flush(){} void udt_socket::close() { try { - UDT::close( _udt_socket_id ); - check_udt_errors(); + if( is_open() ) + { + default_epool_service().remove( _udt_socket_id ); + + UDT::close( _udt_socket_id ); + check_udt_errors(); + _udt_socket_id = UDT::INVALID_SOCK; + } } FC_CAPTURE_AND_RETHROW() } /// @} void udt_socket::open() { _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0); + if( _udt_socket_id == UDT::INVALID_SOCK ) + check_udt_errors(); } bool udt_socket::is_open()const @@ -274,9 +293,14 @@ namespace fc { :_udt_socket_id( UDT::INVALID_SOCK ) { _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0); + if( _udt_socket_id == UDT::INVALID_SOCK ) + check_udt_errors(); + bool block = false; UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool)); + check_udt_errors(); UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool)); + check_udt_errors(); } udt_server::~udt_server() @@ -291,8 +315,13 @@ namespace fc { void udt_server::close() { try { - UDT::close( _udt_socket_id ); - check_udt_errors(); + if( _udt_socket_id != UDT::INVALID_SOCK ) + { + default_epool_service().remove( _udt_socket_id ); + UDT::close( _udt_socket_id ); + check_udt_errors(); + _udt_socket_id = UDT::INVALID_SOCK; + } } FC_CAPTURE_AND_RETHROW() } void udt_server::accept( udt_socket& s ) diff --git a/tests/udtc.cpp b/tests/udtc.cpp index adfd497..aa48bb3 100644 --- a/tests/udtc.cpp +++ b/tests/udtc.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -11,19 +12,34 @@ int main( int argc, char** argv ) try { udt_socket sock; sock.bind( fc::ip::endpoint::from_string( "127.0.0.1:6666" ) ); + ilog( "." ); sock.connect_to( fc::ip::endpoint::from_string( "127.0.0.1:7777" ) ); + ilog( "after connect to..." ); std::cout << "local endpoint: " < response; response.resize(1024); int r = sock.readsome( response.data(), response.size() ); - - std::cout << "response: '"< response; response.resize(1024); int r = sock.readsome( response.data(), response.size() ); - - std::cout << "request: '"<