diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index 2434d2b..36a2727 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -26,7 +26,7 @@ namespace asio { boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ); - void error_handler( const promise::ptr& p, + void error_handler( const promise::ptr& p, const boost::system::error_code& ec ); void error_handler_ec( promise* p, const boost::system::error_code& ec ); @@ -130,10 +130,11 @@ namespace asio { */ template void accept( AcceptorType& acc, SocketType& sock ) { - promise::ptr p( new promise("fc::asio::tcp::accept") ); + //promise::ptr p( new promise("fc::asio::tcp::accept") ); + promise::ptr p( new promise("fc::asio::tcp::accept") ); acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); - auto ec = p->wait(); - if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); + p->wait(); + //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); } /** @brief wraps boost::asio::socket::async_connect @@ -142,10 +143,10 @@ namespace asio { */ template void connect( AsyncSocket& sock, const EndpointType& ep ) { - promise::ptr p(new promise("fc::asio::tcp::connect")); + promise::ptr p(new promise("fc::asio::tcp::connect")); sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); - auto ec = p->wait(); - if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); + p->wait(); + //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); } } namespace udp { diff --git a/include/fc/io/varint.hpp b/include/fc/io/varint.hpp index 9f5cebd..0ab2ce9 100644 --- a/include/fc/io/varint.hpp +++ b/include/fc/io/varint.hpp @@ -12,6 +12,11 @@ struct unsigned_int { unsigned_int& operator=( const T& v ) { value = v; return *this; } uint32_t value; + + template + friend bool operator==( const unsigned_int& i, const T& v ) { return v == i.value; } + template + friend bool operator!=( const unsigned_int& i, const T& v ) { return v != i.value; } }; struct signed_int { diff --git a/src/asio.cpp b/src/asio.cpp index 3e4dbdb..9a52288 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -11,7 +11,12 @@ namespace fc { else { // elog( "%s", boost::system::system_error(ec).what() ); // p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); - if( ec == boost::asio::error::eof ) + if( ec == boost::asio::error::operation_aborted ) + { + p->set_exception( fc::exception_ptr( new fc::canceled_exception( + FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + } + else if( ec == boost::asio::error::eof ) { p->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); @@ -28,9 +33,28 @@ namespace fc { p->set_value(bytes_transferred); *oec = ec; } - void error_handler( const promise::ptr& p, + void error_handler( const promise::ptr& p, const boost::system::error_code& ec ) { - p->set_value(ec); + if( !ec ) p->set_value(); + else + { + if( ec == boost::asio::error::operation_aborted ) + { + p->set_exception( fc::exception_ptr( new fc::canceled_exception( + FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + } + else if( ec == boost::asio::error::eof ) + { + p->set_exception( fc::exception_ptr( new fc::eof_exception( + FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + } + else + { + // elog( "${message} ", ("message", boost::system::system_error(ec).what())); + p->set_exception( fc::exception_ptr( new fc::exception( + FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + } + } } void error_handler_ec( promise* p, @@ -63,27 +87,18 @@ namespace fc { boost::asio::io_service& default_io_service(bool cleanup) { static boost::asio::io_service io; static boost::asio::io_service::work the_work(io); - static fc::thread fc1("asio1"); - static fc::thread fc2("asio2"); - static fc::thread fc3("asio3"); - static fc::future future1( fc1.async([=]() { io.run(); }) ); - static fc::future future2( fc2.async([=]() { io.run(); }) ); - static fc::future future3( fc3.async([=]() { io.run(); }) ); - /* - static boost::thread io_t([=] { fc1 = &fc::thread::current(); fc1->set_name("asio1"); io.run(); }); - static boost::thread io_t2([=]{ fc2 = &fc::thread::current(); fc2->set_name("asio2"); io.run(); }); - static boost::thread io_t3([=]{ fc3 = &fc::thread::current(); fc3->set_name("asio3"); io.run(); }); - */ - if (cleanup) - { - io.stop(); - fc1.quit(); - fc2.quit(); - fc3.quit(); - future1.wait(); - future2.wait(); - future3.wait(); - } + static boost::thread io_t([=] + { + try { + fc::thread::current().set_name("asio"); + io.run(); + } + catch(...) + { + elog( "unexpected asio exception" ); + } + } + ); return io; } diff --git a/src/network/tcp_socket.cpp b/src/network/tcp_socket.cpp index d436f9e..0b11e3c 100644 --- a/src/network/tcp_socket.cpp +++ b/src/network/tcp_socket.cpp @@ -75,14 +75,22 @@ namespace fc { bool tcp_server::accept( tcp_socket& s ) { - if( !my ) return false; - fc::promise::ptr p( new promise("tcp::accept") ); - my->_accept.async_accept( s.my->_sock, [=]( const boost::system::error_code& e ) { - p->set_value(e); - } ); - auto ec = p->wait(); - if( ec ) FC_THROW_EXCEPTION( exception, "system error: ${message}", ("message", fc::string(boost::system::system_error(ec).what()) )); - return true; + try + { + if( !my ) return false; + + fc::asio::tcp::accept( my->_accept, s.my->_sock ); + /* + fc::promise::ptr p( new promise("tcp::accept") ); + my->_accept.async_accept( s.my->_sock, [=]( const boost::system::error_code& e ) { + p->set_value(e); + } ); + auto ec = p->wait(); + if( ec ) FC_THROW_EXCEPTION( exception, "system error: ${message}", ("message", fc::string(boost::system::system_error(ec).what()) )); + return true; + */ + return true; + } FC_RETHROW_EXCEPTIONS( warn, "Unable to accept connection on socket." ); } void tcp_server::listen( uint16_t port ) { if( my ) delete my;