diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index ef8f6b9..4eac5da 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -19,9 +19,27 @@ namespace asio { namespace detail { using namespace fc; - void read_write_handler( const promise::ptr& p, - const boost::system::error_code& ec, - size_t bytes_transferred ); + class read_write_handler + { + public: + read_write_handler(const promise::ptr& p); + void operator()(const boost::system::error_code& ec, size_t bytes_transferred); + private: + promise::ptr _completion_promise; + }; + + class read_write_handler_with_buffer : public read_write_handler + { + public: + read_write_handler_with_buffer(const promise::ptr& p, + const std::shared_ptr& buffer); + private: + std::shared_ptr _buffer; + }; + + //void read_write_handler( const promise::ptr& p, + // const boost::system::error_code& ec, + // size_t bytes_transferred ); void read_write_handler_ec( promise* p, boost::system::error_code* oec, const boost::system::error_code& ec, @@ -62,7 +80,7 @@ namespace asio { template size_t read( AsyncReadStream& s, const MutableBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::read")); - boost::asio::async_read( s, buf, boost::bind( detail::read_write_handler, p, _1, _2 ) ); + boost::asio::async_read( s, buf, detail::read_write_handler(p) ); return p->wait(); } /** @@ -81,15 +99,47 @@ namespace asio { template future read_some(AsyncReadStream& s, const MutableBufferSequence& buf) { - promise::ptr p(new promise("fc::asio::async_read_some")); - s.async_read_some(buf, boost::bind(detail::read_write_handler, p, _1, _2)); - return p;//->wait(); + promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + s.async_read_some(buf, detail::read_write_handler(completion_promise)); + return completion_promise;//->wait(); + } + + template + future read_some(AsyncReadStream& s, char* buffer, size_t length) + { + promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + s.async_read_some(boost::asio::buffer(buffer, length), + detail::read_write_handler(completion_promise)); + return completion_promise;//->wait(); + } + + template + future read_some(AsyncReadStream& s, const std::shared_ptr& buffer, size_t length) + { + promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + s.async_read_some(boost::asio::buffer(buffer.get(), length), + detail::read_write_handler_with_buffer(completion_promise, buffer)); + return completion_promise;//->wait(); } template void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise::ptr completion_promise) { - s.async_read_some(buf, boost::bind(detail::read_write_handler, completion_promise, _1, _2)); + s.async_read_some(buf, detail::read_write_handler(completion_promise)); + } + + template + void async_read_some(AsyncReadStream& s, char* buffer, + size_t length, promise::ptr completion_promise) + { + s.async_read_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise)); + } + + template + void async_read_some(AsyncReadStream& s, const std::shared_ptr& buffer, + size_t length, promise::ptr completion_promise) + { + s.async_read_some(boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(completion_promise, buffer)); } template @@ -107,7 +157,7 @@ namespace asio { template size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::write")); - boost::asio::async_write(s, buf, boost::bind( detail::read_write_handler, p, _1, _2 ) ); + boost::asio::async_write(s, buf, detail::read_write_handler(p)); return p->wait(); } @@ -119,7 +169,23 @@ namespace asio { template future write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::write_some")); - s.async_write_some( buf, boost::bind( detail::read_write_handler, p, _1, _2 ) ); + s.async_write_some( buf, detail::read_write_handler(p)); + return p; //->wait(); + } + + template + future write_some( AsyncWriteStream& s, const char* buffer, + size_t length ) { + promise::ptr p(new promise("fc::asio::write_some")); + s.async_write_some( boost::asio::buffer(buffer, length), detail::read_write_handler(p)); + return p; //->wait(); + } + + template + future write_some( AsyncWriteStream& s, const std::shared_ptr& buffer, + size_t length ) { + promise::ptr p(new promise("fc::asio::write_some")); + s.async_write_some( boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(p, buffer)); return p; //->wait(); } @@ -130,7 +196,21 @@ namespace asio { */ template void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise::ptr completion_promise) { - s.async_write_some(buf, boost::bind(detail::read_write_handler, completion_promise, _1, _2)); + s.async_write_some(buf, detail::read_write_handler(completion_promise)); + } + + template + void async_write_some(AsyncWriteStream& s, const char* buffer, + size_t length, promise::ptr completion_promise) { + s.async_write_some(boost::asio::buffer(buffer, length), + detail::read_write_handler(completion_promise)); + } + + template + void async_write_some(AsyncWriteStream& s, const std::shared_ptr& buffer, + size_t length, promise::ptr completion_promise) { + s.async_write_some(boost::asio::buffer(buffer.get(), length), + detail::read_write_handler_with_buffer(completion_promise, buffer)); } namespace tcp { diff --git a/include/fc/io/iostream.hpp b/include/fc/io/iostream.hpp index 27945ee..f743048 100644 --- a/include/fc/io/iostream.hpp +++ b/include/fc/io/iostream.hpp @@ -28,6 +28,7 @@ namespace fc { * @throws fc::eof_exception if len bytes cannot be read **/ istream& read( char* buf, size_t len ); + istream& read( const std::shared_ptr& buf, size_t len ); char get(); }; typedef std::shared_ptr istream_ptr; @@ -50,6 +51,7 @@ namespace fc { * but not flushed. **/ ostream& write( const char* buf, size_t len ); + ostream& write( const std::shared_ptr& buf, size_t len ); }; typedef std::shared_ptr ostream_ptr; diff --git a/include/fc/network/tcp_socket.hpp b/include/fc/network/tcp_socket.hpp index 384b65d..33ca221 100644 --- a/include/fc/network/tcp_socket.hpp +++ b/include/fc/network/tcp_socket.hpp @@ -32,12 +32,14 @@ namespace fc { /// istream interface /// @{ virtual size_t readsome( char* buffer, size_t max ); + virtual size_t readsome( const std::shared_ptr& buffer, size_t max ); virtual bool eof()const; /// @} /// ostream interface /// @{ virtual size_t writesome( const char* buffer, size_t len ); + virtual size_t writesome( const std::shared_ptr& buffer, size_t len ); virtual void flush(); virtual void close(); /// @} diff --git a/include/fc/network/tcp_socket_io_hooks.hpp b/include/fc/network/tcp_socket_io_hooks.hpp index a317ed1..e3d98dd 100644 --- a/include/fc/network/tcp_socket_io_hooks.hpp +++ b/include/fc/network/tcp_socket_io_hooks.hpp @@ -1,4 +1,5 @@ #include +#include namespace fc { @@ -7,6 +8,8 @@ namespace fc public: virtual ~tcp_socket_io_hooks() {} virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) = 0; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) = 0; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) = 0; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) = 0; }; } // namesapce fc diff --git a/include/fc/network/udp_socket.hpp b/include/fc/network/udp_socket.hpp index e5f0d5e..cef6eb6 100644 --- a/include/fc/network/udp_socket.hpp +++ b/include/fc/network/udp_socket.hpp @@ -1,6 +1,7 @@ #pragma once #include #include +#include namespace fc { namespace ip { @@ -22,6 +23,7 @@ namespace fc { void set_receive_buffer_size( size_t s ); void bind( const fc::ip::endpoint& ); size_t receive_from( char* b, size_t l, fc::ip::endpoint& from ); + size_t receive_from( std::shared_ptr b, size_t l, fc::ip::endpoint& from ); size_t send_to( const char* b, size_t l, const fc::ip::endpoint& to ); void close(); diff --git a/src/asio.cpp b/src/asio.cpp index 6432ea8..48e2855 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -6,32 +6,26 @@ namespace fc { namespace asio { namespace detail { - void read_write_handler( const promise::ptr& p, const boost::system::error_code& ec, size_t bytes_transferred ) { - if( !ec ) p->set_value(bytes_transferred); - else { - // elog( "%s", boost::system::system_error(ec).what() ); - // p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); -#if 0 - if( ec == boost::asio::error::operation_aborted ) - { - p->set_exception( fc::exception_ptr( new fc::canceled_exception( - FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); - } - else -#endif - if( ec == boost::asio::error::eof ) - { - p->set_exception( fc::exception_ptr( new fc::eof_exception( - FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); - } - else - { - // elog( "${message} ", ("message", boost::system::system_error(ec).what())); - p->set_exception( fc::exception_ptr( new fc::exception( - FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); - } - } - } + + read_write_handler::read_write_handler(const promise::ptr& completion_promise) : + _completion_promise(completion_promise) + {} + void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred) + { + if( !ec ) + _completion_promise->set_value(bytes_transferred); + else if( ec == boost::asio::error::eof ) + _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + else + _completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + } + read_write_handler_with_buffer::read_write_handler_with_buffer(const promise::ptr& completion_promise, + const std::shared_ptr& buffer) : + read_write_handler(completion_promise), + _buffer(buffer) + {} + + void read_write_handler_ec( promise* p, boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ) { p->set_value(bytes_transferred); *oec = ec; @@ -42,14 +36,6 @@ namespace fc { p->set_value(); else { -#if 0 - if( ec == boost::asio::error::operation_aborted ) - { - p->set_exception( fc::exception_ptr( new fc::canceled_exception( - FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); - } - else -#endif if( ec == boost::asio::error::eof ) { p->set_exception( fc::exception_ptr( new fc::eof_exception( diff --git a/src/io/iostream.cpp b/src/io/iostream.cpp index 6eef009..ddb370e 100644 --- a/src/io/iostream.cpp +++ b/src/io/iostream.cpp @@ -352,18 +352,34 @@ namespace fc { istream& istream::read( char* buf, size_t len ) { - auto pos = buf; + char* pos = buf; while( size_t(pos-buf) < len ) pos += readsome( pos, len - (pos - buf) ); return *this; } + istream& istream::read( const std::shared_ptr& buf, size_t len ) + { + char* pos = buf.get(); + while( size_t(pos-buf.get()) < len ) + pos += readsome( pos, len - (pos - buf.get()) ); + return *this; + } + ostream& ostream::write( const char* buf, size_t len ) { - auto pos = buf; + const char* pos = buf; while( size_t(pos-buf) < len ) pos += writesome( pos, len - (pos - buf) ); return *this; } + ostream& ostream::write( const std::shared_ptr& buf, size_t len ) + { + const char* pos = buf.get(); + while( size_t(pos-buf.get()) < len ) + pos += writesome( pos, len - (pos - buf.get()) ); + return *this; + } + } // namespace fc diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index 500731e..07163e2 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -95,6 +95,10 @@ namespace fc { assert(_ntp_thread.is_current()); + uint32_t receive_buffer_size = sizeof(uint64_t) * 1024; + std::shared_ptr receive_buffer(new char[receive_buffer_size], [](char* p){ delete[] p; }); + uint64_t* recv_buf = (uint64_t*)receive_buffer.get(); + //outer while to restart read-loop if exception is thrown while waiting to receive on socket. //while( !_read_loop_done.canceled() ) { @@ -108,10 +112,9 @@ namespace fc while( !_read_loop_done.canceled() ) { fc::ip::endpoint from; - std::array recv_buf; try { - _sock.receive_from( (char*)recv_buf.data(), recv_buf.size(), from ); + _sock.receive_from( receive_buffer, receive_buffer_size, from ); } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket"); uint64_t receive_timestamp_net_order = recv_buf[4]; diff --git a/src/network/rate_limiting.cpp b/src/network/rate_limiting.cpp index aac30ae..2647ca4 100644 --- a/src/network/rate_limiting.cpp +++ b/src/network/rate_limiting.cpp @@ -38,7 +38,8 @@ namespace fc { public: boost::asio::ip::tcp::socket& socket; - const char* buffer; + const char* raw_buffer; + std::shared_ptr shared_buffer; rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket, const char* buffer, @@ -46,20 +47,36 @@ namespace fc promise::ptr completion_promise) : rate_limited_operation(length, std::move(completion_promise)), socket(socket), - buffer(buffer) + raw_buffer(buffer) + {} + rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket, + const std::shared_ptr& buffer, + size_t length, + promise::ptr completion_promise) : + rate_limited_operation(length, std::move(completion_promise)), + socket(socket), + raw_buffer(nullptr), + shared_buffer(buffer) {} virtual void perform_operation() override { - asio::async_write_some(socket, - boost::asio::buffer(buffer, permitted_length), - completion_promise); + if (raw_buffer) + asio::async_write_some(socket, + raw_buffer, permitted_length, + completion_promise); + else + asio::async_write_some(socket, + shared_buffer, permitted_length, + completion_promise); } }; + class rate_limited_tcp_read_operation : public rate_limited_operation { public: boost::asio::ip::tcp::socket& socket; - char* buffer; + char* raw_buffer; + std::shared_ptr shared_buffer; rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket, char* buffer, @@ -67,13 +84,28 @@ namespace fc promise::ptr completion_promise) : rate_limited_operation(length, std::move(completion_promise)), socket(socket), - buffer(buffer) + raw_buffer(buffer) + {} + rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket, + const std::shared_ptr& buffer, + size_t length, + promise::ptr completion_promise) : + rate_limited_operation(length, std::move(completion_promise)), + socket(socket), + raw_buffer(nullptr), + shared_buffer(buffer) {} virtual void perform_operation() override { - asio::async_read_some(socket, - boost::asio::buffer(buffer, permitted_length), - completion_promise); + if (raw_buffer) + asio::async_read_some(socket, + raw_buffer, permitted_length, + completion_promise); + else + asio::async_read_some(socket, + shared_buffer, permitted_length, + completion_promise); + } }; @@ -177,7 +209,13 @@ namespace fc uint32_t burstiness_in_seconds = 1); virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; + template + size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length); virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; + template + size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length); void process_pending_reads(); void process_pending_writes(); @@ -202,7 +240,18 @@ namespace fc { } - size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) + size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + { + return readsome_impl(socket, buffer, length); + } + + size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) + { + return readsome_impl(socket, buffer, length); + } + + template + size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length) { size_t bytes_read; if (_download_bytes_per_second) @@ -217,17 +266,38 @@ namespace fc else if (_new_read_operation_available_promise) _new_read_operation_available_promise->set_value(); - bytes_read = completion_promise->wait(); + try + { + bytes_read = completion_promise->wait(); + } + catch (...) + { + _read_operations_for_next_iteration.remove(&read_operation); + _read_operations_in_progress.remove(&read_operation); + throw; + } _unused_read_tokens += read_operation.permitted_length - bytes_read; } else - bytes_read = asio::read_some(socket, boost::asio::buffer(buffer, length)); + bytes_read = asio::read_some(socket, buffer, length); _actual_download_rate.update(bytes_read); return bytes_read; } - size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) + + size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) + { + return writesome_impl(socket, buffer, length); + } + + size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + { + return writesome_impl(socket, buffer, length); + } + + template + size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length) { size_t bytes_written; if (_upload_bytes_per_second) @@ -242,16 +312,26 @@ namespace fc else if (_new_write_operation_available_promise) _new_write_operation_available_promise->set_value(); - bytes_written = completion_promise->wait(); + try + { + bytes_written = completion_promise->wait(); + } + catch (...) + { + _write_operations_for_next_iteration.remove(&write_operation); + _write_operations_in_progress.remove(&write_operation); + throw; + } _unused_write_tokens += write_operation.permitted_length - bytes_written; } else - bytes_written = asio::write_some(socket, boost::asio::buffer(buffer, length)); + bytes_written = asio::write_some(socket, buffer, length); _actual_upload_rate.update(bytes_written); return bytes_written; } + void rate_limiting_group_impl::process_pending_reads() { for (;;) @@ -348,8 +428,9 @@ namespace fc { if ((*iter)->permitted_length > 0) { - (*iter)->perform_operation(); + rate_limited_operation* operation_to_perform = *iter; iter = operations_in_progress.erase(iter); + operation_to_perform->perform_operation(); } else ++iter; @@ -362,13 +443,13 @@ namespace fc // the operation immediately without being queued up. This should only be hit if // we change from a limited rate to unlimited for (auto iter = operations_in_progress.begin(); - iter != operations_in_progress.end(); - ++iter) + iter != operations_in_progress.end();) { - (*iter)->permitted_length = (*iter)->length; - (*iter)->perform_operation(); + rate_limited_operation* operation_to_perform = *iter; + iter = operations_in_progress.erase(iter); + operation_to_perform->permitted_length = operation_to_perform->length; + operation_to_perform->perform_operation(); } - operations_in_progress.clear(); } last_iteration_start_time = this_iteration_start_time; } diff --git a/src/network/tcp_socket.cpp b/src/network/tcp_socket.cpp index 7604709..1d67897 100644 --- a/src/network/tcp_socket.cpp +++ b/src/network/tcp_socket.cpp @@ -13,7 +13,7 @@ namespace fc { - class tcp_socket::impl : public tcp_socket_io_hooks{ + class tcp_socket::impl : public tcp_socket_io_hooks { public: impl() : _sock(fc::asio::default_io_service()), @@ -23,11 +23,27 @@ namespace fc { { if( _sock.is_open() ) _sock.close(); - if( _read_in_progress.valid() ) try { _read_in_progress.wait(); } catch ( ... ) {} - if( _write_in_progress.valid() ) try { _write_in_progress.wait(); } catch ( ... ) {} + if( _read_in_progress.valid() ) + try + { + _read_in_progress.wait(); + } + catch ( ... ) + { + } + if( _write_in_progress.valid() ) + try + { + _write_in_progress.wait(); + } + catch ( ... ) + { + } } virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; fc::future _write_in_progress; fc::future _read_in_progress; @@ -37,11 +53,19 @@ namespace fc { size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) { - return (_read_in_progress = fc::asio::read_some(socket, boost::asio::buffer(buffer, length))).wait(); + return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait(); + } + size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + { + return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait(); } size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) { - return (_write_in_progress = fc::asio::write_some(socket, boost::asio::buffer(buffer, length))).wait(); + return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait(); + } + size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + { + return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait(); } @@ -72,7 +96,13 @@ namespace fc { return !my->_sock.is_open(); } - size_t tcp_socket::writesome(const char* buf, size_t len) { + size_t tcp_socket::writesome(const char* buf, size_t len) + { + return my->_io_hooks->writesome(my->_sock, buf, len); + } + + size_t tcp_socket::writesome(const std::shared_ptr& buf, size_t len) + { return my->_io_hooks->writesome(my->_sock, buf, len); } @@ -97,7 +127,12 @@ namespace fc { FC_RETHROW_EXCEPTIONS( warn, "error getting socket's local endpoint" ); } - size_t tcp_socket::readsome( char* buf, size_t len ) { + size_t tcp_socket::readsome( char* buf, size_t len ) + { + return my->_io_hooks->readsome(my->_sock, buf, len); + } + + size_t tcp_socket::readsome( const std::shared_ptr& buf, size_t len ) { return my->_io_hooks->readsome(my->_sock, buf, len); } diff --git a/src/network/udp_socket.cpp b/src/network/udp_socket.cpp index 5d4a664..9a1745a 100644 --- a/src/network/udp_socket.cpp +++ b/src/network/udp_socket.cpp @@ -73,6 +73,32 @@ namespace fc { void udp_socket::bind( const fc::ip::endpoint& e ) { my->_sock.bind( to_asio_ep(e) ); } + + size_t udp_socket::receive_from( std::shared_ptr receive_buffer, size_t receive_buffer_length, fc::ip::endpoint& from ) + { + try + { + boost::asio::ip::udp::endpoint boost_from_endpoint; + size_t bytes_read = my->_sock.receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length), boost_from_endpoint ); + from = to_fc_ep(boost_from_endpoint); + return bytes_read; + } + catch( const boost::system::system_error& e ) + { + if( e.code() != boost::asio::error::would_block ) + throw; + } + + boost::asio::ip::udp::endpoint boost_from_endpoint; + promise::ptr completion_promise(new promise("udp_socket::receive_from")); + my->_sock.async_receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length), + boost_from_endpoint, + asio::detail::read_write_handler_with_buffer(completion_promise, receive_buffer) ); + size_t bytes_read = completion_promise->wait(); + from = to_fc_ep(boost_from_endpoint); + return bytes_read; + } + size_t udp_socket::receive_from( char* receive_buffer, size_t receive_buffer_length, fc::ip::endpoint& from ) { try @@ -91,13 +117,7 @@ namespace fc { boost::asio::ip::udp::endpoint boost_from_endpoint; promise::ptr completion_promise(new promise("udp_socket::receive_from")); my->_sock.async_receive_from( boost::asio::buffer(receive_buffer, receive_buffer_length), boost_from_endpoint, - [=]( const boost::system::error_code& ec, size_t bytes_transferred ) { - if( !ec ) - completion_promise->set_value(bytes_transferred); - else - completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", - ("message", boost::system::system_error(ec).what())) ) ) ); - }); + asio::detail::read_write_handler(completion_promise) ); size_t bytes_read = completion_promise->wait(); from = to_fc_ep(boost_from_endpoint); return bytes_read;