From aa6882b3b786efb95b3bc2ca7520082a7a54040d Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Tue, 9 Sep 2014 11:10:37 -0400 Subject: [PATCH] Allow us to safely cancel tasks that are executing asynchronous network reads and writes. This was previously unsafe because we almost always passed read/write buffers to boost that were on the stack. Canceling the task deleted the stack and therefore the buffer, but couldn't reliably prevent boost from writing to the buffer if data came in after the cancel. This commit adds variants of the read and write functions that take a shared_ptr instead of a raw char* as the buffer, and these variants will ensure the shared_ptr will outlive the boost::asio read/write. --- include/fc/asio.hpp | 102 +++++++++++++++-- include/fc/io/iostream.hpp | 2 + include/fc/network/tcp_socket.hpp | 2 + include/fc/network/tcp_socket_io_hooks.hpp | 3 + include/fc/network/udp_socket.hpp | 2 + src/asio.cpp | 54 ++++----- src/io/iostream.cpp | 20 +++- src/network/ntp.cpp | 7 +- src/network/rate_limiting.cpp | 125 +++++++++++++++++---- src/network/tcp_socket.cpp | 49 ++++++-- src/network/udp_socket.cpp | 34 ++++-- 11 files changed, 315 insertions(+), 85 deletions(-) diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index ef8f6b9..4eac5da 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -19,9 +19,27 @@ namespace asio { namespace detail { using namespace fc; - void read_write_handler( const promise::ptr& p, - const boost::system::error_code& ec, - size_t bytes_transferred ); + class read_write_handler + { + public: + read_write_handler(const promise::ptr& p); + void operator()(const boost::system::error_code& ec, size_t bytes_transferred); + private: + promise::ptr _completion_promise; + }; + + class read_write_handler_with_buffer : public read_write_handler + { + public: + read_write_handler_with_buffer(const promise::ptr& p, + const std::shared_ptr& buffer); + private: + std::shared_ptr _buffer; + }; + + //void read_write_handler( const promise::ptr& p, + // const boost::system::error_code& ec, + // size_t bytes_transferred ); void read_write_handler_ec( promise* p, boost::system::error_code* oec, const boost::system::error_code& ec, @@ -62,7 +80,7 @@ namespace asio { template size_t read( AsyncReadStream& s, const MutableBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::read")); - boost::asio::async_read( s, buf, boost::bind( detail::read_write_handler, p, _1, _2 ) ); + boost::asio::async_read( s, buf, detail::read_write_handler(p) ); return p->wait(); } /** @@ -81,15 +99,47 @@ namespace asio { template future read_some(AsyncReadStream& s, const MutableBufferSequence& buf) { - promise::ptr p(new promise("fc::asio::async_read_some")); - s.async_read_some(buf, boost::bind(detail::read_write_handler, p, _1, _2)); - return p;//->wait(); + promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + s.async_read_some(buf, detail::read_write_handler(completion_promise)); + return completion_promise;//->wait(); + } + + template + future read_some(AsyncReadStream& s, char* buffer, size_t length) + { + promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + s.async_read_some(boost::asio::buffer(buffer, length), + detail::read_write_handler(completion_promise)); + return completion_promise;//->wait(); + } + + template + future read_some(AsyncReadStream& s, const std::shared_ptr& buffer, size_t length) + { + promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + s.async_read_some(boost::asio::buffer(buffer.get(), length), + detail::read_write_handler_with_buffer(completion_promise, buffer)); + return completion_promise;//->wait(); } template void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise::ptr completion_promise) { - s.async_read_some(buf, boost::bind(detail::read_write_handler, completion_promise, _1, _2)); + s.async_read_some(buf, detail::read_write_handler(completion_promise)); + } + + template + void async_read_some(AsyncReadStream& s, char* buffer, + size_t length, promise::ptr completion_promise) + { + s.async_read_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise)); + } + + template + void async_read_some(AsyncReadStream& s, const std::shared_ptr& buffer, + size_t length, promise::ptr completion_promise) + { + s.async_read_some(boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(completion_promise, buffer)); } template @@ -107,7 +157,7 @@ namespace asio { template size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::write")); - boost::asio::async_write(s, buf, boost::bind( detail::read_write_handler, p, _1, _2 ) ); + boost::asio::async_write(s, buf, detail::read_write_handler(p)); return p->wait(); } @@ -119,7 +169,23 @@ namespace asio { template future write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::write_some")); - s.async_write_some( buf, boost::bind( detail::read_write_handler, p, _1, _2 ) ); + s.async_write_some( buf, detail::read_write_handler(p)); + return p; //->wait(); + } + + template + future write_some( AsyncWriteStream& s, const char* buffer, + size_t length ) { + promise::ptr p(new promise("fc::asio::write_some")); + s.async_write_some( boost::asio::buffer(buffer, length), detail::read_write_handler(p)); + return p; //->wait(); + } + + template + future write_some( AsyncWriteStream& s, const std::shared_ptr& buffer, + size_t length ) { + promise::ptr p(new promise("fc::asio::write_some")); + s.async_write_some( boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(p, buffer)); return p; //->wait(); } @@ -130,7 +196,21 @@ namespace asio { */ template void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise::ptr completion_promise) { - s.async_write_some(buf, boost::bind(detail::read_write_handler, completion_promise, _1, _2)); + s.async_write_some(buf, detail::read_write_handler(completion_promise)); + } + + template + void async_write_some(AsyncWriteStream& s, const char* buffer, + size_t length, promise::ptr completion_promise) { + s.async_write_some(boost::asio::buffer(buffer, length), + detail::read_write_handler(completion_promise)); + } + + template + void async_write_some(AsyncWriteStream& s, const std::shared_ptr& buffer, + size_t length, promise::ptr completion_promise) { + s.async_write_some(boost::asio::buffer(buffer.get(), length), + detail::read_write_handler_with_buffer(completion_promise, buffer)); } namespace tcp { diff --git a/include/fc/io/iostream.hpp b/include/fc/io/iostream.hpp index 27945ee..f743048 100644 --- a/include/fc/io/iostream.hpp +++ b/include/fc/io/iostream.hpp @@ -28,6 +28,7 @@ namespace fc { * @throws fc::eof_exception if len bytes cannot be read **/ istream& read( char* buf, size_t len ); + istream& read( const std::shared_ptr& buf, size_t len ); char get(); }; typedef std::shared_ptr istream_ptr; @@ -50,6 +51,7 @@ namespace fc { * but not flushed. **/ ostream& write( const char* buf, size_t len ); + ostream& write( const std::shared_ptr& buf, size_t len ); }; typedef std::shared_ptr ostream_ptr; diff --git a/include/fc/network/tcp_socket.hpp b/include/fc/network/tcp_socket.hpp index 384b65d..33ca221 100644 --- a/include/fc/network/tcp_socket.hpp +++ b/include/fc/network/tcp_socket.hpp @@ -32,12 +32,14 @@ namespace fc { /// istream interface /// @{ virtual size_t readsome( char* buffer, size_t max ); + virtual size_t readsome( const std::shared_ptr& buffer, size_t max ); virtual bool eof()const; /// @} /// ostream interface /// @{ virtual size_t writesome( const char* buffer, size_t len ); + virtual size_t writesome( const std::shared_ptr& buffer, size_t len ); virtual void flush(); virtual void close(); /// @} diff --git a/include/fc/network/tcp_socket_io_hooks.hpp b/include/fc/network/tcp_socket_io_hooks.hpp index a317ed1..e3d98dd 100644 --- a/include/fc/network/tcp_socket_io_hooks.hpp +++ b/include/fc/network/tcp_socket_io_hooks.hpp @@ -1,4 +1,5 @@ #include +#include namespace fc { @@ -7,6 +8,8 @@ namespace fc public: virtual ~tcp_socket_io_hooks() {} virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) = 0; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) = 0; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) = 0; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) = 0; }; } // namesapce fc diff --git a/include/fc/network/udp_socket.hpp b/include/fc/network/udp_socket.hpp index e5f0d5e..cef6eb6 100644 --- a/include/fc/network/udp_socket.hpp +++ b/include/fc/network/udp_socket.hpp @@ -1,6 +1,7 @@ #pragma once #include #include +#include namespace fc { namespace ip { @@ -22,6 +23,7 @@ namespace fc { void set_receive_buffer_size( size_t s ); void bind( const fc::ip::endpoint& ); size_t receive_from( char* b, size_t l, fc::ip::endpoint& from ); + size_t receive_from( std::shared_ptr b, size_t l, fc::ip::endpoint& from ); size_t send_to( const char* b, size_t l, const fc::ip::endpoint& to ); void close(); diff --git a/src/asio.cpp b/src/asio.cpp index 6432ea8..48e2855 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -6,32 +6,26 @@ namespace fc { namespace asio { namespace detail { - void read_write_handler( const promise::ptr& p, const boost::system::error_code& ec, size_t bytes_transferred ) { - if( !ec ) p->set_value(bytes_transferred); - else { - // elog( "%s", boost::system::system_error(ec).what() ); - // p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); -#if 0 - if( ec == boost::asio::error::operation_aborted ) - { - p->set_exception( fc::exception_ptr( new fc::canceled_exception( - FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); - } - else -#endif - if( ec == boost::asio::error::eof ) - { - p->set_exception( fc::exception_ptr( new fc::eof_exception( - FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); - } - else - { - // elog( "${message} ", ("message", boost::system::system_error(ec).what())); - p->set_exception( fc::exception_ptr( new fc::exception( - FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); - } - } - } + + read_write_handler::read_write_handler(const promise::ptr& completion_promise) : + _completion_promise(completion_promise) + {} + void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred) + { + if( !ec ) + _completion_promise->set_value(bytes_transferred); + else if( ec == boost::asio::error::eof ) + _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + else + _completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + } + read_write_handler_with_buffer::read_write_handler_with_buffer(const promise::ptr& completion_promise, + const std::shared_ptr& buffer) : + read_write_handler(completion_promise), + _buffer(buffer) + {} + + void read_write_handler_ec( promise* p, boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ) { p->set_value(bytes_transferred); *oec = ec; @@ -42,14 +36,6 @@ namespace fc { p->set_value(); else { -#if 0 - if( ec == boost::asio::error::operation_aborted ) - { - p->set_exception( fc::exception_ptr( new fc::canceled_exception( - FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); - } - else -#endif if( ec == boost::asio::error::eof ) { p->set_exception( fc::exception_ptr( new fc::eof_exception( diff --git a/src/io/iostream.cpp b/src/io/iostream.cpp index 6eef009..ddb370e 100644 --- a/src/io/iostream.cpp +++ b/src/io/iostream.cpp @@ -352,18 +352,34 @@ namespace fc { istream& istream::read( char* buf, size_t len ) { - auto pos = buf; + char* pos = buf; while( size_t(pos-buf) < len ) pos += readsome( pos, len - (pos - buf) ); return *this; } + istream& istream::read( const std::shared_ptr& buf, size_t len ) + { + char* pos = buf.get(); + while( size_t(pos-buf.get()) < len ) + pos += readsome( pos, len - (pos - buf.get()) ); + return *this; + } + ostream& ostream::write( const char* buf, size_t len ) { - auto pos = buf; + const char* pos = buf; while( size_t(pos-buf) < len ) pos += writesome( pos, len - (pos - buf) ); return *this; } + ostream& ostream::write( const std::shared_ptr& buf, size_t len ) + { + const char* pos = buf.get(); + while( size_t(pos-buf.get()) < len ) + pos += writesome( pos, len - (pos - buf.get()) ); + return *this; + } + } // namespace fc diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index 500731e..07163e2 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -95,6 +95,10 @@ namespace fc { assert(_ntp_thread.is_current()); + uint32_t receive_buffer_size = sizeof(uint64_t) * 1024; + std::shared_ptr receive_buffer(new char[receive_buffer_size], [](char* p){ delete[] p; }); + uint64_t* recv_buf = (uint64_t*)receive_buffer.get(); + //outer while to restart read-loop if exception is thrown while waiting to receive on socket. //while( !_read_loop_done.canceled() ) { @@ -108,10 +112,9 @@ namespace fc while( !_read_loop_done.canceled() ) { fc::ip::endpoint from; - std::array recv_buf; try { - _sock.receive_from( (char*)recv_buf.data(), recv_buf.size(), from ); + _sock.receive_from( receive_buffer, receive_buffer_size, from ); } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket"); uint64_t receive_timestamp_net_order = recv_buf[4]; diff --git a/src/network/rate_limiting.cpp b/src/network/rate_limiting.cpp index aac30ae..2647ca4 100644 --- a/src/network/rate_limiting.cpp +++ b/src/network/rate_limiting.cpp @@ -38,7 +38,8 @@ namespace fc { public: boost::asio::ip::tcp::socket& socket; - const char* buffer; + const char* raw_buffer; + std::shared_ptr shared_buffer; rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket, const char* buffer, @@ -46,20 +47,36 @@ namespace fc promise::ptr completion_promise) : rate_limited_operation(length, std::move(completion_promise)), socket(socket), - buffer(buffer) + raw_buffer(buffer) + {} + rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket, + const std::shared_ptr& buffer, + size_t length, + promise::ptr completion_promise) : + rate_limited_operation(length, std::move(completion_promise)), + socket(socket), + raw_buffer(nullptr), + shared_buffer(buffer) {} virtual void perform_operation() override { - asio::async_write_some(socket, - boost::asio::buffer(buffer, permitted_length), - completion_promise); + if (raw_buffer) + asio::async_write_some(socket, + raw_buffer, permitted_length, + completion_promise); + else + asio::async_write_some(socket, + shared_buffer, permitted_length, + completion_promise); } }; + class rate_limited_tcp_read_operation : public rate_limited_operation { public: boost::asio::ip::tcp::socket& socket; - char* buffer; + char* raw_buffer; + std::shared_ptr shared_buffer; rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket, char* buffer, @@ -67,13 +84,28 @@ namespace fc promise::ptr completion_promise) : rate_limited_operation(length, std::move(completion_promise)), socket(socket), - buffer(buffer) + raw_buffer(buffer) + {} + rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket, + const std::shared_ptr& buffer, + size_t length, + promise::ptr completion_promise) : + rate_limited_operation(length, std::move(completion_promise)), + socket(socket), + raw_buffer(nullptr), + shared_buffer(buffer) {} virtual void perform_operation() override { - asio::async_read_some(socket, - boost::asio::buffer(buffer, permitted_length), - completion_promise); + if (raw_buffer) + asio::async_read_some(socket, + raw_buffer, permitted_length, + completion_promise); + else + asio::async_read_some(socket, + shared_buffer, permitted_length, + completion_promise); + } }; @@ -177,7 +209,13 @@ namespace fc uint32_t burstiness_in_seconds = 1); virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; + template + size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length); virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; + template + size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length); void process_pending_reads(); void process_pending_writes(); @@ -202,7 +240,18 @@ namespace fc { } - size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) + size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + { + return readsome_impl(socket, buffer, length); + } + + size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) + { + return readsome_impl(socket, buffer, length); + } + + template + size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length) { size_t bytes_read; if (_download_bytes_per_second) @@ -217,17 +266,38 @@ namespace fc else if (_new_read_operation_available_promise) _new_read_operation_available_promise->set_value(); - bytes_read = completion_promise->wait(); + try + { + bytes_read = completion_promise->wait(); + } + catch (...) + { + _read_operations_for_next_iteration.remove(&read_operation); + _read_operations_in_progress.remove(&read_operation); + throw; + } _unused_read_tokens += read_operation.permitted_length - bytes_read; } else - bytes_read = asio::read_some(socket, boost::asio::buffer(buffer, length)); + bytes_read = asio::read_some(socket, buffer, length); _actual_download_rate.update(bytes_read); return bytes_read; } - size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) + + size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) + { + return writesome_impl(socket, buffer, length); + } + + size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + { + return writesome_impl(socket, buffer, length); + } + + template + size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length) { size_t bytes_written; if (_upload_bytes_per_second) @@ -242,16 +312,26 @@ namespace fc else if (_new_write_operation_available_promise) _new_write_operation_available_promise->set_value(); - bytes_written = completion_promise->wait(); + try + { + bytes_written = completion_promise->wait(); + } + catch (...) + { + _write_operations_for_next_iteration.remove(&write_operation); + _write_operations_in_progress.remove(&write_operation); + throw; + } _unused_write_tokens += write_operation.permitted_length - bytes_written; } else - bytes_written = asio::write_some(socket, boost::asio::buffer(buffer, length)); + bytes_written = asio::write_some(socket, buffer, length); _actual_upload_rate.update(bytes_written); return bytes_written; } + void rate_limiting_group_impl::process_pending_reads() { for (;;) @@ -348,8 +428,9 @@ namespace fc { if ((*iter)->permitted_length > 0) { - (*iter)->perform_operation(); + rate_limited_operation* operation_to_perform = *iter; iter = operations_in_progress.erase(iter); + operation_to_perform->perform_operation(); } else ++iter; @@ -362,13 +443,13 @@ namespace fc // the operation immediately without being queued up. This should only be hit if // we change from a limited rate to unlimited for (auto iter = operations_in_progress.begin(); - iter != operations_in_progress.end(); - ++iter) + iter != operations_in_progress.end();) { - (*iter)->permitted_length = (*iter)->length; - (*iter)->perform_operation(); + rate_limited_operation* operation_to_perform = *iter; + iter = operations_in_progress.erase(iter); + operation_to_perform->permitted_length = operation_to_perform->length; + operation_to_perform->perform_operation(); } - operations_in_progress.clear(); } last_iteration_start_time = this_iteration_start_time; } diff --git a/src/network/tcp_socket.cpp b/src/network/tcp_socket.cpp index 7604709..1d67897 100644 --- a/src/network/tcp_socket.cpp +++ b/src/network/tcp_socket.cpp @@ -13,7 +13,7 @@ namespace fc { - class tcp_socket::impl : public tcp_socket_io_hooks{ + class tcp_socket::impl : public tcp_socket_io_hooks { public: impl() : _sock(fc::asio::default_io_service()), @@ -23,11 +23,27 @@ namespace fc { { if( _sock.is_open() ) _sock.close(); - if( _read_in_progress.valid() ) try { _read_in_progress.wait(); } catch ( ... ) {} - if( _write_in_progress.valid() ) try { _write_in_progress.wait(); } catch ( ... ) {} + if( _read_in_progress.valid() ) + try + { + _read_in_progress.wait(); + } + catch ( ... ) + { + } + if( _write_in_progress.valid() ) + try + { + _write_in_progress.wait(); + } + catch ( ... ) + { + } } virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; fc::future _write_in_progress; fc::future _read_in_progress; @@ -37,11 +53,19 @@ namespace fc { size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) { - return (_read_in_progress = fc::asio::read_some(socket, boost::asio::buffer(buffer, length))).wait(); + return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait(); + } + size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + { + return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait(); } size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) { - return (_write_in_progress = fc::asio::write_some(socket, boost::asio::buffer(buffer, length))).wait(); + return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait(); + } + size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + { + return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait(); } @@ -72,7 +96,13 @@ namespace fc { return !my->_sock.is_open(); } - size_t tcp_socket::writesome(const char* buf, size_t len) { + size_t tcp_socket::writesome(const char* buf, size_t len) + { + return my->_io_hooks->writesome(my->_sock, buf, len); + } + + size_t tcp_socket::writesome(const std::shared_ptr& buf, size_t len) + { return my->_io_hooks->writesome(my->_sock, buf, len); } @@ -97,7 +127,12 @@ namespace fc { FC_RETHROW_EXCEPTIONS( warn, "error getting socket's local endpoint" ); } - size_t tcp_socket::readsome( char* buf, size_t len ) { + size_t tcp_socket::readsome( char* buf, size_t len ) + { + return my->_io_hooks->readsome(my->_sock, buf, len); + } + + size_t tcp_socket::readsome( const std::shared_ptr& buf, size_t len ) { return my->_io_hooks->readsome(my->_sock, buf, len); } diff --git a/src/network/udp_socket.cpp b/src/network/udp_socket.cpp index 5d4a664..9a1745a 100644 --- a/src/network/udp_socket.cpp +++ b/src/network/udp_socket.cpp @@ -73,6 +73,32 @@ namespace fc { void udp_socket::bind( const fc::ip::endpoint& e ) { my->_sock.bind( to_asio_ep(e) ); } + + size_t udp_socket::receive_from( std::shared_ptr receive_buffer, size_t receive_buffer_length, fc::ip::endpoint& from ) + { + try + { + boost::asio::ip::udp::endpoint boost_from_endpoint; + size_t bytes_read = my->_sock.receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length), boost_from_endpoint ); + from = to_fc_ep(boost_from_endpoint); + return bytes_read; + } + catch( const boost::system::system_error& e ) + { + if( e.code() != boost::asio::error::would_block ) + throw; + } + + boost::asio::ip::udp::endpoint boost_from_endpoint; + promise::ptr completion_promise(new promise("udp_socket::receive_from")); + my->_sock.async_receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length), + boost_from_endpoint, + asio::detail::read_write_handler_with_buffer(completion_promise, receive_buffer) ); + size_t bytes_read = completion_promise->wait(); + from = to_fc_ep(boost_from_endpoint); + return bytes_read; + } + size_t udp_socket::receive_from( char* receive_buffer, size_t receive_buffer_length, fc::ip::endpoint& from ) { try @@ -91,13 +117,7 @@ namespace fc { boost::asio::ip::udp::endpoint boost_from_endpoint; promise::ptr completion_promise(new promise("udp_socket::receive_from")); my->_sock.async_receive_from( boost::asio::buffer(receive_buffer, receive_buffer_length), boost_from_endpoint, - [=]( const boost::system::error_code& ec, size_t bytes_transferred ) { - if( !ec ) - completion_promise->set_value(bytes_transferred); - else - completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", - ("message", boost::system::system_error(ec).what())) ) ) ); - }); + asio::detail::read_write_handler(completion_promise) ); size_t bytes_read = completion_promise->wait(); from = to_fc_ep(boost_from_endpoint); return bytes_read;