peerplays-fc/include/fc/asio.hpp

296 lines
12 KiB
C++
Raw Permalink Normal View History

2012-09-08 06:41:28 +00:00
/**
* @file fc/cmt/asio.hpp
* @brief defines wrappers for boost::asio functions
*/
2013-01-11 14:12:53 +00:00
#pragma once
2012-09-08 06:41:28 +00:00
#include <boost/asio.hpp>
2012-09-09 03:46:19 +00:00
#include <boost/bind.hpp>
#include <fc/thread/future.hpp>
#include <fc/io/iostream.hpp>
2012-09-08 06:41:28 +00:00
namespace fc {
/**
2012-09-09 03:46:19 +00:00
* @brief defines fc wrappers for boost::asio functions.
2012-09-08 06:41:28 +00:00
*/
namespace asio {
/**
2012-09-09 03:46:19 +00:00
* @brief internal implementation types/methods for fc::asio
2012-09-08 06:41:28 +00:00
*/
namespace detail {
using namespace fc;
class read_write_handler
{
public:
read_write_handler(const promise<size_t>::ptr& p);
void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
private:
promise<size_t>::ptr _completion_promise;
};
class read_write_handler_with_buffer
{
public:
read_write_handler_with_buffer(const promise<size_t>::ptr& p,
const std::shared_ptr<const char>& buffer);
void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
private:
promise<size_t>::ptr _completion_promise;
std::shared_ptr<const char> _buffer;
};
void error_handler( const promise<void>::ptr& p,
const boost::system::error_code& ec );
2012-09-08 06:41:28 +00:00
template<typename C>
struct non_blocking {
bool operator()( C& c ) { return c.non_blocking(); }
bool operator()( C& c, bool s ) { c.non_blocking(s); return true; }
};
#if WIN32 // windows stream handles do not support non blocking!
template<>
struct non_blocking<boost::asio::windows::stream_handle> {
typedef boost::asio::windows::stream_handle C;
bool operator()( C& ) { return false; }
bool operator()( C&, bool ) { return false; }
2012-09-08 06:41:28 +00:00
};
#endif
2012-09-08 06:41:28 +00:00
}
/**
2012-09-09 03:46:19 +00:00
* @return the default boost::asio::io_service for use with fc::asio
2012-09-08 06:41:28 +00:00
*
* This IO service is automatically running in its own thread to service asynchronous
* requests without blocking any other threads.
*/
boost::asio::io_service& default_io_service(bool cleanup = false);
2012-09-08 06:41:28 +00:00
/**
* @brief wraps boost::asio::async_read
* @pre s.non_blocking() == true
* @return the number of bytes read.
*/
template<typename AsyncReadStream, typename MutableBufferSequence>
size_t read( AsyncReadStream& s, const MutableBufferSequence& buf ) {
2012-09-09 03:46:19 +00:00
promise<size_t>::ptr p(new promise<size_t>("fc::asio::read"));
boost::asio::async_read( s, buf, detail::read_write_handler(p) );
2012-09-08 06:41:28 +00:00
return p->wait();
}
/**
* This method will read at least 1 byte from the stream and will
* cooperatively block until that byte is available or an error occurs.
*
* If the stream is not in 'non-blocking' mode it will be put in 'non-blocking'
* mode it the stream supports s.non_blocking() and s.non_blocking(bool).
*
* If in non blocking mode, the call will be synchronous avoiding heap allocs
* and context switching. If the sync call returns 'would block' then an
* promise is created and an async read is generated.
*
* @return the number of bytes read.
*/
template<typename AsyncReadStream, typename MutableBufferSequence>
future<size_t> read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(buf, detail::read_write_handler(completion_promise));
return completion_promise;//->wait();
}
template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(boost::asio::buffer(buffer + offset, length),
detail::read_write_handler(completion_promise));
return completion_promise;//->wait();
}
template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(boost::asio::buffer(buffer.get() + offset, length),
detail::read_write_handler_with_buffer(completion_promise, buffer));
return completion_promise;//->wait();
}
template<typename AsyncReadStream, typename MutableBufferSequence>
void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise<size_t>::ptr completion_promise)
{
s.async_read_some(buf, detail::read_write_handler(completion_promise));
}
template<typename AsyncReadStream>
void async_read_some(AsyncReadStream& s, char* buffer,
size_t length, promise<size_t>::ptr completion_promise)
{
s.async_read_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise));
}
template<typename AsyncReadStream>
void async_read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer,
size_t length, size_t offset, promise<size_t>::ptr completion_promise)
{
s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer));
2012-09-08 06:41:28 +00:00
}
template<typename AsyncReadStream>
size_t read_some( AsyncReadStream& s, boost::asio::streambuf& buf )
{
char buffer[1024];
size_t bytes_read = read_some( s, boost::asio::buffer( buffer, sizeof(buffer) ) );
buf.sputn( buffer, bytes_read );
return bytes_read;
}
2012-09-08 06:41:28 +00:00
/** @brief wraps boost::asio::async_write
* @return the number of bytes written
*/
template<typename AsyncWriteStream, typename ConstBufferSequence>
size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
2012-09-09 03:46:19 +00:00
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write"));
boost::asio::async_write(s, buf, detail::read_write_handler(p));
2012-09-08 06:41:28 +00:00
return p->wait();
}
/**
* @pre s.non_blocking() == true
* @brief wraps boost::asio::async_write_some
* @return the number of bytes written
*/
template<typename AsyncWriteStream, typename ConstBufferSequence>
future<size_t> write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
2012-09-09 03:46:19 +00:00
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( buf, detail::read_write_handler(p));
return p; //->wait();
}
template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const char* buffer,
size_t length, size_t offset = 0) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p));
return p; //->wait();
}
template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length, size_t offset ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer));
return p; //->wait();
2012-09-08 06:41:28 +00:00
}
/**
* @pre s.non_blocking() == true
* @brief wraps boost::asio::async_write_some
* @return the number of bytes written
*/
template<typename AsyncWriteStream, typename ConstBufferSequence>
void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise<size_t>::ptr completion_promise) {
s.async_write_some(buf, detail::read_write_handler(completion_promise));
}
template<typename AsyncWriteStream>
void async_write_some(AsyncWriteStream& s, const char* buffer,
size_t length, promise<size_t>::ptr completion_promise) {
s.async_write_some(boost::asio::buffer(buffer, length),
detail::read_write_handler(completion_promise));
}
template<typename AsyncWriteStream>
void async_write_some(AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length, size_t offset, promise<size_t>::ptr completion_promise) {
s.async_write_some(boost::asio::buffer(buffer.get() + offset, length),
detail::read_write_handler_with_buffer(completion_promise, buffer));
}
2012-09-08 06:41:28 +00:00
namespace tcp {
typedef boost::asio::ip::tcp::endpoint endpoint;
typedef boost::asio::ip::tcp::resolver::iterator resolver_iterator;
typedef boost::asio::ip::tcp::resolver resolver;
std::vector<endpoint> resolve( const std::string& hostname, const std::string& port );
/** @brief wraps boost::asio::async_accept
* @post sock is connected
* @post sock.non_blocking() == true
* @throw on error.
*/
template<typename SocketType, typename AcceptorType>
void accept( AcceptorType& acc, SocketType& sock ) {
//promise<boost::system::error_code>::ptr p( new promise<boost::system::error_code>("fc::asio::tcp::accept") );
promise<void>::ptr p( new promise<void>("fc::asio::tcp::accept") );
2012-09-09 03:46:19 +00:00
acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
p->wait();
//if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
2012-09-08 06:41:28 +00:00
}
/** @brief wraps boost::asio::socket::async_connect
* @post sock.non_blocking() == true
* @throw on error
*/
template<typename AsyncSocket, typename EndpointType>
void connect( AsyncSocket& sock, const EndpointType& ep ) {
promise<void>::ptr p(new promise<void>("fc::asio::tcp::connect"));
2012-09-09 03:46:19 +00:00
sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
p->wait();
//if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
2012-09-08 06:41:28 +00:00
}
}
namespace udp {
typedef boost::asio::ip::udp::endpoint endpoint;
typedef boost::asio::ip::udp::resolver::iterator resolver_iterator;
typedef boost::asio::ip::udp::resolver resolver;
/// @brief resolve all udp::endpoints for hostname:port
std::vector<endpoint> resolve( resolver& r, const std::string& hostname,
const std::string& port );
2012-09-08 06:41:28 +00:00
}
template<typename AsyncReadStream>
class istream : public virtual fc::istream
{
public:
istream( std::shared_ptr<AsyncReadStream> str )
:_stream( fc::move(str) ){}
virtual size_t readsome( char* buf, size_t len )
{
return fc::asio::read_some(*_stream, buf, len).wait();
}
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
{
return fc::asio::read_some(*_stream, buf, len, offset).wait();
}
private:
std::shared_ptr<AsyncReadStream> _stream;
};
template<typename AsyncWriteStream>
class ostream : public virtual fc::ostream
{
public:
ostream( std::shared_ptr<AsyncWriteStream> str )
:_stream( fc::move(str) ){}
virtual size_t writesome( const char* buf, size_t len )
{
return fc::asio::write_some(*_stream, buf, len).wait();
}
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
{
return fc::asio::write_some(*_stream, buf, len, offset).wait();
}
virtual void close(){ _stream->close(); }
virtual void flush() {}
private:
std::shared_ptr<AsyncWriteStream> _stream;
};
2012-09-08 06:41:28 +00:00
} } // namespace fc::asio