/** * @file fc/cmt/asio.hpp * @brief defines wrappers for boost::asio functions */ #pragma once #include #include #include #include namespace fc { /** * @brief defines fc wrappers for boost::asio functions. */ namespace asio { /** * @brief internal implementation types/methods for fc::asio */ namespace detail { using namespace fc; class read_write_handler { public: read_write_handler(const promise::ptr& p); void operator()(const boost::system::error_code& ec, size_t bytes_transferred); private: promise::ptr _completion_promise; }; class read_write_handler_with_buffer { public: read_write_handler_with_buffer(const promise::ptr& p, const std::shared_ptr& buffer); void operator()(const boost::system::error_code& ec, size_t bytes_transferred); private: promise::ptr _completion_promise; std::shared_ptr _buffer; }; void error_handler( const promise::ptr& p, const boost::system::error_code& ec ); template struct non_blocking { bool operator()( C& c ) { return c.non_blocking(); } bool operator()( C& c, bool s ) { c.non_blocking(s); return true; } }; #if WIN32 // windows stream handles do not support non blocking! template<> struct non_blocking { typedef boost::asio::windows::stream_handle C; bool operator()( C& ) { return false; } bool operator()( C&, bool ) { return false; } }; #endif } /** * @return the default boost::asio::io_service for use with fc::asio * * This IO service is automatically running in its own thread to service asynchronous * requests without blocking any other threads. */ boost::asio::io_service& default_io_service(bool cleanup = false); /** * @brief wraps boost::asio::async_read * @pre s.non_blocking() == true * @return the number of bytes read. */ template size_t read( AsyncReadStream& s, const MutableBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::read")); boost::asio::async_read( s, buf, detail::read_write_handler(p) ); return p->wait(); } /** * This method will read at least 1 byte from the stream and will * cooperatively block until that byte is available or an error occurs. * * If the stream is not in 'non-blocking' mode it will be put in 'non-blocking' * mode it the stream supports s.non_blocking() and s.non_blocking(bool). * * If in non blocking mode, the call will be synchronous avoiding heap allocs * and context switching. If the sync call returns 'would block' then an * promise is created and an async read is generated. * * @return the number of bytes read. */ template future read_some(AsyncReadStream& s, const MutableBufferSequence& buf) { promise::ptr completion_promise(new promise("fc::asio::async_read_some")); s.async_read_some(buf, detail::read_write_handler(completion_promise)); return completion_promise;//->wait(); } template future read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0) { promise::ptr completion_promise(new promise("fc::asio::async_read_some")); s.async_read_some(boost::asio::buffer(buffer + offset, length), detail::read_write_handler(completion_promise)); return completion_promise;//->wait(); } template future read_some(AsyncReadStream& s, const std::shared_ptr& buffer, size_t length, size_t offset) { promise::ptr completion_promise(new promise("fc::asio::async_read_some")); s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer)); return completion_promise;//->wait(); } template void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise::ptr completion_promise) { s.async_read_some(buf, detail::read_write_handler(completion_promise)); } template void async_read_some(AsyncReadStream& s, char* buffer, size_t length, promise::ptr completion_promise) { s.async_read_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise)); } template void async_read_some(AsyncReadStream& s, const std::shared_ptr& buffer, size_t length, size_t offset, promise::ptr completion_promise) { s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer)); } template size_t read_some( AsyncReadStream& s, boost::asio::streambuf& buf ) { char buffer[1024]; size_t bytes_read = read_some( s, boost::asio::buffer( buffer, sizeof(buffer) ) ); buf.sputn( buffer, bytes_read ); return bytes_read; } /** @brief wraps boost::asio::async_write * @return the number of bytes written */ template size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::write")); boost::asio::async_write(s, buf, detail::read_write_handler(p)); return p->wait(); } /** * @pre s.non_blocking() == true * @brief wraps boost::asio::async_write_some * @return the number of bytes written */ template future write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::write_some")); s.async_write_some( buf, detail::read_write_handler(p)); return p; //->wait(); } template future write_some( AsyncWriteStream& s, const char* buffer, size_t length, size_t offset = 0) { promise::ptr p(new promise("fc::asio::write_some")); s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p)); return p; //->wait(); } template future write_some( AsyncWriteStream& s, const std::shared_ptr& buffer, size_t length, size_t offset ) { promise::ptr p(new promise("fc::asio::write_some")); s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer)); return p; //->wait(); } /** * @pre s.non_blocking() == true * @brief wraps boost::asio::async_write_some * @return the number of bytes written */ template void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise::ptr completion_promise) { s.async_write_some(buf, detail::read_write_handler(completion_promise)); } template void async_write_some(AsyncWriteStream& s, const char* buffer, size_t length, promise::ptr completion_promise) { s.async_write_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise)); } template void async_write_some(AsyncWriteStream& s, const std::shared_ptr& buffer, size_t length, size_t offset, promise::ptr completion_promise) { s.async_write_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer)); } namespace tcp { typedef boost::asio::ip::tcp::endpoint endpoint; typedef boost::asio::ip::tcp::resolver::iterator resolver_iterator; typedef boost::asio::ip::tcp::resolver resolver; std::vector resolve( const std::string& hostname, const std::string& port ); /** @brief wraps boost::asio::async_accept * @post sock is connected * @post sock.non_blocking() == true * @throw on error. */ template void accept( AcceptorType& acc, SocketType& sock ) { //promise::ptr p( new promise("fc::asio::tcp::accept") ); promise::ptr p( new promise("fc::asio::tcp::accept") ); acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); p->wait(); //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); } /** @brief wraps boost::asio::socket::async_connect * @post sock.non_blocking() == true * @throw on error */ template void connect( AsyncSocket& sock, const EndpointType& ep ) { promise::ptr p(new promise("fc::asio::tcp::connect")); sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); p->wait(); //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); } } namespace udp { typedef boost::asio::ip::udp::endpoint endpoint; typedef boost::asio::ip::udp::resolver::iterator resolver_iterator; typedef boost::asio::ip::udp::resolver resolver; /// @brief resolve all udp::endpoints for hostname:port std::vector resolve( resolver& r, const std::string& hostname, const std::string& port ); } template class istream : public virtual fc::istream { public: istream( std::shared_ptr str ) :_stream( fc::move(str) ){} virtual size_t readsome( char* buf, size_t len ) { return fc::asio::read_some(*_stream, buf, len).wait(); } virtual size_t readsome( const std::shared_ptr& buf, size_t len, size_t offset ) { return fc::asio::read_some(*_stream, buf, len, offset).wait(); } private: std::shared_ptr _stream; }; template class ostream : public virtual fc::ostream { public: ostream( std::shared_ptr str ) :_stream( fc::move(str) ){} virtual size_t writesome( const char* buf, size_t len ) { return fc::asio::write_some(*_stream, buf, len).wait(); } virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ) { return fc::asio::write_some(*_stream, buf, len, offset).wait(); } virtual void close(){ _stream->close(); } virtual void flush() {} private: std::shared_ptr _stream; }; } } // namespace fc::asio