peerplays-fc/src/asio.cpp

181 lines
7.3 KiB
C++
Raw Normal View History

2012-09-09 04:25:43 +00:00
#include <fc/asio.hpp>
#include <fc/thread/thread.hpp>
2012-09-09 04:25:43 +00:00
#include <boost/thread.hpp>
#include <fc/log/logger.hpp>
#include <fc/exception/exception.hpp>
2012-09-09 04:25:43 +00:00
namespace fc {
namespace asio {
namespace detail {
read_write_handler::read_write_handler(const promise<size_t>::ptr& completion_promise) :
_completion_promise(completion_promise)
{
// assert(false); // to detect anywhere we're not passing in a shared buffer
}
void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
{
// assert(false); // to detect anywhere we're not passing in a shared buffer
if( !ec )
_completion_promise->set_value(bytes_transferred);
else if( ec == boost::asio::error::eof )
_completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
else
_completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
read_write_handler_with_buffer::read_write_handler_with_buffer(const promise<size_t>::ptr& completion_promise,
const std::shared_ptr<const char>& buffer) :
_completion_promise(completion_promise),
_buffer(buffer)
{}
void read_write_handler_with_buffer::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
{
if( !ec )
_completion_promise->set_value(bytes_transferred);
else if( ec == boost::asio::error::eof )
_completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
else
_completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
2012-09-09 04:25:43 +00:00
void read_write_handler_ec( promise<size_t>* p, boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ) {
p->set_value(bytes_transferred);
*oec = ec;
}
void error_handler( const promise<void>::ptr& p,
2012-09-09 04:25:43 +00:00
const boost::system::error_code& ec ) {
if( !ec )
p->set_value();
else
{
if( ec == boost::asio::error::eof )
{
p->set_exception( fc::exception_ptr( new fc::eof_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
else
{
//elog( "${message} ", ("message", boost::system::system_error(ec).what()));
p->set_exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
}
2012-09-09 04:25:43 +00:00
}
void error_handler_ec( promise<boost::system::error_code>* p,
2012-09-09 04:25:43 +00:00
const boost::system::error_code& ec ) {
p->set_value(ec);
}
template<typename EndpointType, typename IteratorType>
void resolve_handler(
2012-09-09 04:25:43 +00:00
const typename promise<std::vector<EndpointType> >::ptr& p,
const boost::system::error_code& ec,
2012-09-09 04:25:43 +00:00
IteratorType itr) {
if( !ec ) {
std::vector<EndpointType> eps;
while( itr != IteratorType() ) {
eps.push_back(*itr);
++itr;
}
p->set_value( eps );
} else {
//elog( "%s", boost::system::system_error(ec).what() );
//p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) );
p->set_exception(
fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "process exited with: ${message} ",
("message", boost::system::system_error(ec).what())) ) ) );
2012-09-09 04:25:43 +00:00
}
}
}
struct default_io_service_scope
{
boost::asio::io_service* io;
boost::thread* asio_thread;
boost::asio::io_service::work* the_work;
default_io_service_scope()
{
io = new boost::asio::io_service();
the_work = new boost::asio::io_service::work(*io);
asio_thread = new boost::thread( [=]()
{
fc::thread::current().set_name("asio");
while (!io->stopped())
{
try
{
io->run();
}
catch (const fc::exception& e)
{
elog("Caught unhandled exception in asio service loop: ${e}", ("e", e));
}
catch (const std::exception& e)
{
elog("Caught unhandled exception in asio service loop: ${e}", ("e", e.what()));
}
catch (...)
{
elog("Caught unhandled exception in asio service loop");
}
}
});
}
void cleanup()
{
delete the_work;
io->stop();
asio_thread->join();
delete io;
delete asio_thread;
}
~default_io_service_scope()
{}
};
/// If cleanup is true, do not use the return value; it is a null reference
boost::asio::io_service& default_io_service(bool cleanup) {
static default_io_service_scope fc_asio_service[4];
if (cleanup) {
for( int i = 0; i < 4; ++i )
fc_asio_service[i].cleanup();
}
return *fc_asio_service[0].io;
2012-09-09 04:25:43 +00:00
}
namespace tcp {
std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port)
{
try
{
resolver res( fc::asio::default_io_service() );
promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") );
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait();;
2012-09-09 04:25:43 +00:00
}
FC_RETHROW_EXCEPTIONS(warn, "")
}
2012-09-09 04:25:43 +00:00
}
namespace udp {
std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port)
{
try
{
resolver res( fc::asio::default_io_service() );
promise<std::vector<endpoint> >::ptr p( new promise<std::vector<endpoint> >("udp::resolve completion") );
res.async_resolve( resolver::query(hostname,port),
boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait();
2012-09-09 04:25:43 +00:00
}
FC_RETHROW_EXCEPTIONS(warn, "")
}
2012-09-09 04:25:43 +00:00
}
2012-09-09 04:25:43 +00:00
} } // namespace fc::asio