clean up exception handling on asio / sockets

This commit is contained in:
Daniel Larimer 2013-07-14 17:58:27 -04:00
parent f18e16cbc7
commit 57e5796839
4 changed files with 68 additions and 39 deletions

View file

@ -26,7 +26,7 @@ namespace asio {
boost::system::error_code* oec, boost::system::error_code* oec,
const boost::system::error_code& ec, const boost::system::error_code& ec,
size_t bytes_transferred ); size_t bytes_transferred );
void error_handler( const promise<boost::system::error_code>::ptr& p, void error_handler( const promise<void>::ptr& p,
const boost::system::error_code& ec ); const boost::system::error_code& ec );
void error_handler_ec( promise<boost::system::error_code>* p, void error_handler_ec( promise<boost::system::error_code>* p,
const boost::system::error_code& ec ); const boost::system::error_code& ec );
@ -130,10 +130,11 @@ namespace asio {
*/ */
template<typename SocketType, typename AcceptorType> template<typename SocketType, typename AcceptorType>
void accept( AcceptorType& acc, SocketType& sock ) { void accept( AcceptorType& acc, SocketType& sock ) {
promise<boost::system::error_code>::ptr p( new promise<boost::system::error_code>("fc::asio::tcp::accept") ); //promise<boost::system::error_code>::ptr p( new promise<boost::system::error_code>("fc::asio::tcp::accept") );
promise<void>::ptr p( new promise<void>("fc::asio::tcp::accept") );
acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
auto ec = p->wait(); p->wait();
if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
} }
/** @brief wraps boost::asio::socket::async_connect /** @brief wraps boost::asio::socket::async_connect
@ -142,10 +143,10 @@ namespace asio {
*/ */
template<typename AsyncSocket, typename EndpointType> template<typename AsyncSocket, typename EndpointType>
void connect( AsyncSocket& sock, const EndpointType& ep ) { void connect( AsyncSocket& sock, const EndpointType& ep ) {
promise<boost::system::error_code>::ptr p(new promise<boost::system::error_code>("fc::asio::tcp::connect")); promise<void>::ptr p(new promise<void>("fc::asio::tcp::connect"));
sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
auto ec = p->wait(); p->wait();
if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
} }
} }
namespace udp { namespace udp {

View file

@ -12,6 +12,11 @@ struct unsigned_int {
unsigned_int& operator=( const T& v ) { value = v; return *this; } unsigned_int& operator=( const T& v ) { value = v; return *this; }
uint32_t value; uint32_t value;
template<typename T>
friend bool operator==( const unsigned_int& i, const T& v ) { return v == i.value; }
template<typename T>
friend bool operator!=( const unsigned_int& i, const T& v ) { return v != i.value; }
}; };
struct signed_int { struct signed_int {

View file

@ -11,7 +11,12 @@ namespace fc {
else { else {
// elog( "%s", boost::system::system_error(ec).what() ); // elog( "%s", boost::system::system_error(ec).what() );
// p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); // p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) );
if( ec == boost::asio::error::eof ) if( ec == boost::asio::error::operation_aborted )
{
p->set_exception( fc::exception_ptr( new fc::canceled_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
else if( ec == boost::asio::error::eof )
{ {
p->set_exception( fc::exception_ptr( new fc::eof_exception( p->set_exception( fc::exception_ptr( new fc::eof_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
@ -28,9 +33,28 @@ namespace fc {
p->set_value(bytes_transferred); p->set_value(bytes_transferred);
*oec = ec; *oec = ec;
} }
void error_handler( const promise<boost::system::error_code>::ptr& p, void error_handler( const promise<void>::ptr& p,
const boost::system::error_code& ec ) { const boost::system::error_code& ec ) {
p->set_value(ec); if( !ec ) p->set_value();
else
{
if( ec == boost::asio::error::operation_aborted )
{
p->set_exception( fc::exception_ptr( new fc::canceled_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
else if( ec == boost::asio::error::eof )
{
p->set_exception( fc::exception_ptr( new fc::eof_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
else
{
// elog( "${message} ", ("message", boost::system::system_error(ec).what()));
p->set_exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
}
} }
void error_handler_ec( promise<boost::system::error_code>* p, void error_handler_ec( promise<boost::system::error_code>* p,
@ -63,27 +87,18 @@ namespace fc {
boost::asio::io_service& default_io_service(bool cleanup) { boost::asio::io_service& default_io_service(bool cleanup) {
static boost::asio::io_service io; static boost::asio::io_service io;
static boost::asio::io_service::work the_work(io); static boost::asio::io_service::work the_work(io);
static fc::thread fc1("asio1"); static boost::thread io_t([=]
static fc::thread fc2("asio2"); {
static fc::thread fc3("asio3"); try {
static fc::future<void> future1( fc1.async([=]() { io.run(); }) ); fc::thread::current().set_name("asio");
static fc::future<void> future2( fc2.async([=]() { io.run(); }) ); io.run();
static fc::future<void> future3( fc3.async([=]() { io.run(); }) ); }
/* catch(...)
static boost::thread io_t([=] { fc1 = &fc::thread::current(); fc1->set_name("asio1"); io.run(); }); {
static boost::thread io_t2([=]{ fc2 = &fc::thread::current(); fc2->set_name("asio2"); io.run(); }); elog( "unexpected asio exception" );
static boost::thread io_t3([=]{ fc3 = &fc::thread::current(); fc3->set_name("asio3"); io.run(); }); }
*/ }
if (cleanup) );
{
io.stop();
fc1.quit();
fc2.quit();
fc3.quit();
future1.wait();
future2.wait();
future3.wait();
}
return io; return io;
} }

View file

@ -75,14 +75,22 @@ namespace fc {
bool tcp_server::accept( tcp_socket& s ) { bool tcp_server::accept( tcp_socket& s ) {
if( !my ) return false; try
fc::promise<boost::system::error_code>::ptr p( new promise<boost::system::error_code>("tcp::accept") ); {
my->_accept.async_accept( s.my->_sock, [=]( const boost::system::error_code& e ) { if( !my ) return false;
p->set_value(e);
} ); fc::asio::tcp::accept( my->_accept, s.my->_sock );
auto ec = p->wait(); /*
if( ec ) FC_THROW_EXCEPTION( exception, "system error: ${message}", ("message", fc::string(boost::system::system_error(ec).what()) )); fc::promise<void>::ptr p( new promise<void>("tcp::accept") );
return true; my->_accept.async_accept( s.my->_sock, [=]( const boost::system::error_code& e ) {
p->set_value(e);
} );
auto ec = p->wait();
if( ec ) FC_THROW_EXCEPTION( exception, "system error: ${message}", ("message", fc::string(boost::system::system_error(ec).what()) ));
return true;
*/
return true;
} FC_RETHROW_EXCEPTIONS( warn, "Unable to accept connection on socket." );
} }
void tcp_server::listen( uint16_t port ) { void tcp_server::listen( uint16_t port ) {
if( my ) delete my; if( my ) delete my;