This commit is contained in:
Daniel Larimer 2015-07-23 13:17:03 -04:00
commit 2808c02cc7
2 changed files with 31 additions and 22 deletions

View file

@ -15,21 +15,21 @@ namespace fc {
void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred) void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
{ {
// assert(false); // to detect anywhere we're not passing in a shared buffer // assert(false); // to detect anywhere we're not passing in a shared buffer
if( !ec ) if( !ec )
_completion_promise->set_value(bytes_transferred); _completion_promise->set_value(bytes_transferred);
else if( ec == boost::asio::error::eof ) else if( ec == boost::asio::error::eof )
_completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
else else
_completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); _completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
} }
read_write_handler_with_buffer::read_write_handler_with_buffer(const promise<size_t>::ptr& completion_promise, read_write_handler_with_buffer::read_write_handler_with_buffer(const promise<size_t>::ptr& completion_promise,
const std::shared_ptr<const char>& buffer) : const std::shared_ptr<const char>& buffer) :
_completion_promise(completion_promise), _completion_promise(completion_promise),
_buffer(buffer) _buffer(buffer)
{} {}
void read_write_handler_with_buffer::operator()(const boost::system::error_code& ec, size_t bytes_transferred) void read_write_handler_with_buffer::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
{ {
if( !ec ) if( !ec )
_completion_promise->set_value(bytes_transferred); _completion_promise->set_value(bytes_transferred);
else if( ec == boost::asio::error::eof ) else if( ec == boost::asio::error::eof )
_completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
@ -41,7 +41,7 @@ namespace fc {
p->set_value(bytes_transferred); p->set_value(bytes_transferred);
*oec = ec; *oec = ec;
} }
void error_handler( const promise<void>::ptr& p, void error_handler( const promise<void>::ptr& p,
const boost::system::error_code& ec ) { const boost::system::error_code& ec ) {
if( !ec ) if( !ec )
p->set_value(); p->set_value();
@ -49,27 +49,27 @@ namespace fc {
{ {
if( ec == boost::asio::error::eof ) if( ec == boost::asio::error::eof )
{ {
p->set_exception( fc::exception_ptr( new fc::eof_exception( p->set_exception( fc::exception_ptr( new fc::eof_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
} }
else else
{ {
//elog( "${message} ", ("message", boost::system::system_error(ec).what())); //elog( "${message} ", ("message", boost::system::system_error(ec).what()));
p->set_exception( fc::exception_ptr( new fc::exception( p->set_exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
} }
} }
} }
void error_handler_ec( promise<boost::system::error_code>* p, void error_handler_ec( promise<boost::system::error_code>* p,
const boost::system::error_code& ec ) { const boost::system::error_code& ec ) {
p->set_value(ec); p->set_value(ec);
} }
template<typename EndpointType, typename IteratorType> template<typename EndpointType, typename IteratorType>
void resolve_handler( void resolve_handler(
const typename promise<std::vector<EndpointType> >::ptr& p, const typename promise<std::vector<EndpointType> >::ptr& p,
const boost::system::error_code& ec, const boost::system::error_code& ec,
IteratorType itr) { IteratorType itr) {
if( !ec ) { if( !ec ) {
std::vector<EndpointType> eps; std::vector<EndpointType> eps;
@ -81,9 +81,9 @@ namespace fc {
} else { } else {
//elog( "%s", boost::system::system_error(ec).what() ); //elog( "%s", boost::system::system_error(ec).what() );
//p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); //p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) );
p->set_exception( p->set_exception(
fc::exception_ptr( new fc::exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "process exited with: ${message} ", FC_LOG_MESSAGE( error, "process exited with: ${message} ",
("message", boost::system::system_error(ec).what())) ) ) ); ("message", boost::system::system_error(ec).what())) ) ) );
} }
} }
@ -100,13 +100,13 @@ namespace fc {
io = new boost::asio::io_service(); io = new boost::asio::io_service();
the_work = new boost::asio::io_service::work(*io); the_work = new boost::asio::io_service::work(*io);
asio_thread = new boost::thread( [=]() asio_thread = new boost::thread( [=]()
{ {
fc::thread::current().set_name("asio"); fc::thread::current().set_name("asio");
io->run(); io->run();
}); });
} }
~default_io_service_scope() void cleanup()
{ {
delete the_work; delete the_work;
io->stop(); io->stop();
@ -114,20 +114,27 @@ namespace fc {
delete io; delete io;
delete asio_thread; delete asio_thread;
} }
~default_io_service_scope()
{}
}; };
/// If cleanup is true, do not use the return value; it is a null reference
boost::asio::io_service& default_io_service(bool cleanup) { boost::asio::io_service& default_io_service(bool cleanup) {
static default_io_service_scope fc_asio_service; static default_io_service_scope fc_asio_service;
if (cleanup)
fc_asio_service.cleanup();
return *fc_asio_service.io; return *fc_asio_service.io;
} }
namespace tcp { namespace tcp {
std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port) std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port)
{ {
try try
{ {
resolver res( fc::asio::default_io_service() ); resolver res( fc::asio::default_io_service() );
promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") ); promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") );
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) ); boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait();; return p->wait();;
} }
@ -135,18 +142,18 @@ namespace fc {
} }
} }
namespace udp { namespace udp {
std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port) std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port)
{ {
try try
{ {
resolver res( fc::asio::default_io_service() ); resolver res( fc::asio::default_io_service() );
promise<std::vector<endpoint> >::ptr p( new promise<std::vector<endpoint> >("udp::resolve completion") ); promise<std::vector<endpoint> >::ptr p( new promise<std::vector<endpoint> >("udp::resolve completion") );
res.async_resolve( resolver::query(hostname,port), res.async_resolve( resolver::query(hostname,port),
boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) ); boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait(); return p->wait();
} }
FC_RETHROW_EXCEPTIONS(warn, "") FC_RETHROW_EXCEPTIONS(warn, "")
} }
} }
} } // namespace fc::asio } } // namespace fc::asio

View file

@ -255,6 +255,7 @@ namespace fc { namespace http {
auto cpy_con = _connections; auto cpy_con = _connections;
for( auto item : cpy_con ) for( auto item : cpy_con )
_server.close( item.first, 0, "server exit" ); _server.close( item.first, 0, "server exit" );
fc::asio::default_io_service(true);
} }
typedef std::map<connection_hdl, websocket_connection_ptr,std::owner_less<connection_hdl> > con_map; typedef std::map<connection_hdl, websocket_connection_ptr,std::owner_less<connection_hdl> > con_map;
@ -359,6 +360,7 @@ namespace fc { namespace http {
auto cpy_con = _connections; auto cpy_con = _connections;
for( auto item : cpy_con ) for( auto item : cpy_con )
_server.close( item.first, 0, "server exit" ); _server.close( item.first, 0, "server exit" );
fc::asio::default_io_service(true);
} }
typedef std::map<connection_hdl, websocket_connection_ptr,std::owner_less<connection_hdl> > con_map; typedef std::map<connection_hdl, websocket_connection_ptr,std::owner_less<connection_hdl> > con_map;