Allow us to safely cancel tasks that are executing asynchronous network reads

and writes.  This was previously unsafe because we almost always passed read/write
buffers to boost that were on the stack. Canceling the task deleted the stack and
therefore the buffer, but couldn't reliably prevent boost from writing to the buffer
if data came in after the cancel.  This commit adds variants of the read and write
functions that take a shared_ptr<char> instead of a raw char* as the buffer, and
these variants will ensure the shared_ptr will outlive the boost::asio read/write.
This commit is contained in:
Eric Frias 2014-09-09 11:10:37 -04:00
parent db8eb2e5d4
commit aa6882b3b7
11 changed files with 315 additions and 85 deletions

View file

@ -19,9 +19,27 @@ namespace asio {
namespace detail {
using namespace fc;
void read_write_handler( const promise<size_t>::ptr& p,
const boost::system::error_code& ec,
size_t bytes_transferred );
class read_write_handler
{
public:
read_write_handler(const promise<size_t>::ptr& p);
void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
private:
promise<size_t>::ptr _completion_promise;
};
class read_write_handler_with_buffer : public read_write_handler
{
public:
read_write_handler_with_buffer(const promise<size_t>::ptr& p,
const std::shared_ptr<const char>& buffer);
private:
std::shared_ptr<const char> _buffer;
};
//void read_write_handler( const promise<size_t>::ptr& p,
// const boost::system::error_code& ec,
// size_t bytes_transferred );
void read_write_handler_ec( promise<size_t>* p,
boost::system::error_code* oec,
const boost::system::error_code& ec,
@ -62,7 +80,7 @@ namespace asio {
template<typename AsyncReadStream, typename MutableBufferSequence>
size_t read( AsyncReadStream& s, const MutableBufferSequence& buf ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::read"));
boost::asio::async_read( s, buf, boost::bind( detail::read_write_handler, p, _1, _2 ) );
boost::asio::async_read( s, buf, detail::read_write_handler(p) );
return p->wait();
}
/**
@ -81,15 +99,47 @@ namespace asio {
template<typename AsyncReadStream, typename MutableBufferSequence>
future<size_t> read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
{
promise<size_t>::ptr p(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(buf, boost::bind(detail::read_write_handler, p, _1, _2));
return p;//->wait();
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(buf, detail::read_write_handler(completion_promise));
return completion_promise;//->wait();
}
template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(boost::asio::buffer(buffer, length),
detail::read_write_handler(completion_promise));
return completion_promise;//->wait();
}
template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(boost::asio::buffer(buffer.get(), length),
detail::read_write_handler_with_buffer(completion_promise, buffer));
return completion_promise;//->wait();
}
template<typename AsyncReadStream, typename MutableBufferSequence>
void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise<size_t>::ptr completion_promise)
{
s.async_read_some(buf, boost::bind(detail::read_write_handler, completion_promise, _1, _2));
s.async_read_some(buf, detail::read_write_handler(completion_promise));
}
template<typename AsyncReadStream>
void async_read_some(AsyncReadStream& s, char* buffer,
size_t length, promise<size_t>::ptr completion_promise)
{
s.async_read_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise));
}
template<typename AsyncReadStream>
void async_read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer,
size_t length, promise<size_t>::ptr completion_promise)
{
s.async_read_some(boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(completion_promise, buffer));
}
template<typename AsyncReadStream>
@ -107,7 +157,7 @@ namespace asio {
template<typename AsyncWriteStream, typename ConstBufferSequence>
size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write"));
boost::asio::async_write(s, buf, boost::bind( detail::read_write_handler, p, _1, _2 ) );
boost::asio::async_write(s, buf, detail::read_write_handler(p));
return p->wait();
}
@ -119,7 +169,23 @@ namespace asio {
template<typename AsyncWriteStream, typename ConstBufferSequence>
future<size_t> write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( buf, boost::bind( detail::read_write_handler, p, _1, _2 ) );
s.async_write_some( buf, detail::read_write_handler(p));
return p; //->wait();
}
template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const char* buffer,
size_t length ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( boost::asio::buffer(buffer, length), detail::read_write_handler(p));
return p; //->wait();
}
template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(p, buffer));
return p; //->wait();
}
@ -130,7 +196,21 @@ namespace asio {
*/
template<typename AsyncWriteStream, typename ConstBufferSequence>
void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise<size_t>::ptr completion_promise) {
s.async_write_some(buf, boost::bind(detail::read_write_handler, completion_promise, _1, _2));
s.async_write_some(buf, detail::read_write_handler(completion_promise));
}
template<typename AsyncWriteStream>
void async_write_some(AsyncWriteStream& s, const char* buffer,
size_t length, promise<size_t>::ptr completion_promise) {
s.async_write_some(boost::asio::buffer(buffer, length),
detail::read_write_handler(completion_promise));
}
template<typename AsyncWriteStream>
void async_write_some(AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length, promise<size_t>::ptr completion_promise) {
s.async_write_some(boost::asio::buffer(buffer.get(), length),
detail::read_write_handler_with_buffer(completion_promise, buffer));
}
namespace tcp {

View file

@ -28,6 +28,7 @@ namespace fc {
* @throws fc::eof_exception if len bytes cannot be read
**/
istream& read( char* buf, size_t len );
istream& read( const std::shared_ptr<char>& buf, size_t len );
char get();
};
typedef std::shared_ptr<istream> istream_ptr;
@ -50,6 +51,7 @@ namespace fc {
* but not flushed.
**/
ostream& write( const char* buf, size_t len );
ostream& write( const std::shared_ptr<const char>& buf, size_t len );
};
typedef std::shared_ptr<ostream> ostream_ptr;

View file

@ -32,12 +32,14 @@ namespace fc {
/// istream interface
/// @{
virtual size_t readsome( char* buffer, size_t max );
virtual size_t readsome( const std::shared_ptr<char>& buffer, size_t max );
virtual bool eof()const;
/// @}
/// ostream interface
/// @{
virtual size_t writesome( const char* buffer, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buffer, size_t len );
virtual void flush();
virtual void close();
/// @}

View file

@ -1,4 +1,5 @@
#include <boost/asio.hpp>
#include <memory>
namespace fc
{
@ -7,6 +8,8 @@ namespace fc
public:
virtual ~tcp_socket_io_hooks() {}
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) = 0;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length) = 0;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) = 0;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length) = 0;
};
} // namesapce fc

View file

@ -1,6 +1,7 @@
#pragma once
#include <fc/utility.hpp>
#include <fc/shared_ptr.hpp>
#include <memory>
namespace fc {
namespace ip {
@ -22,6 +23,7 @@ namespace fc {
void set_receive_buffer_size( size_t s );
void bind( const fc::ip::endpoint& );
size_t receive_from( char* b, size_t l, fc::ip::endpoint& from );
size_t receive_from( std::shared_ptr<char> b, size_t l, fc::ip::endpoint& from );
size_t send_to( const char* b, size_t l, const fc::ip::endpoint& to );
void close();

View file

@ -6,32 +6,26 @@
namespace fc {
namespace asio {
namespace detail {
void read_write_handler( const promise<size_t>::ptr& p, const boost::system::error_code& ec, size_t bytes_transferred ) {
if( !ec ) p->set_value(bytes_transferred);
else {
// elog( "%s", boost::system::system_error(ec).what() );
// p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) );
#if 0
if( ec == boost::asio::error::operation_aborted )
{
p->set_exception( fc::exception_ptr( new fc::canceled_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
else
#endif
if( ec == boost::asio::error::eof )
{
p->set_exception( fc::exception_ptr( new fc::eof_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
else
{
// elog( "${message} ", ("message", boost::system::system_error(ec).what()));
p->set_exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
}
}
read_write_handler::read_write_handler(const promise<size_t>::ptr& completion_promise) :
_completion_promise(completion_promise)
{}
void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
{
if( !ec )
_completion_promise->set_value(bytes_transferred);
else if( ec == boost::asio::error::eof )
_completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
else
_completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
read_write_handler_with_buffer::read_write_handler_with_buffer(const promise<size_t>::ptr& completion_promise,
const std::shared_ptr<const char>& buffer) :
read_write_handler(completion_promise),
_buffer(buffer)
{}
void read_write_handler_ec( promise<size_t>* p, boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ) {
p->set_value(bytes_transferred);
*oec = ec;
@ -42,14 +36,6 @@ namespace fc {
p->set_value();
else
{
#if 0
if( ec == boost::asio::error::operation_aborted )
{
p->set_exception( fc::exception_ptr( new fc::canceled_exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
else
#endif
if( ec == boost::asio::error::eof )
{
p->set_exception( fc::exception_ptr( new fc::eof_exception(

View file

@ -352,18 +352,34 @@ namespace fc {
istream& istream::read( char* buf, size_t len )
{
auto pos = buf;
char* pos = buf;
while( size_t(pos-buf) < len )
pos += readsome( pos, len - (pos - buf) );
return *this;
}
istream& istream::read( const std::shared_ptr<char>& buf, size_t len )
{
char* pos = buf.get();
while( size_t(pos-buf.get()) < len )
pos += readsome( pos, len - (pos - buf.get()) );
return *this;
}
ostream& ostream::write( const char* buf, size_t len )
{
auto pos = buf;
const char* pos = buf;
while( size_t(pos-buf) < len )
pos += writesome( pos, len - (pos - buf) );
return *this;
}
ostream& ostream::write( const std::shared_ptr<const char>& buf, size_t len )
{
const char* pos = buf.get();
while( size_t(pos-buf.get()) < len )
pos += writesome( pos, len - (pos - buf.get()) );
return *this;
}
} // namespace fc

View file

@ -95,6 +95,10 @@ namespace fc
{
assert(_ntp_thread.is_current());
uint32_t receive_buffer_size = sizeof(uint64_t) * 1024;
std::shared_ptr<char> receive_buffer(new char[receive_buffer_size], [](char* p){ delete[] p; });
uint64_t* recv_buf = (uint64_t*)receive_buffer.get();
//outer while to restart read-loop if exception is thrown while waiting to receive on socket.
//while( !_read_loop_done.canceled() )
{
@ -108,10 +112,9 @@ namespace fc
while( !_read_loop_done.canceled() )
{
fc::ip::endpoint from;
std::array<uint64_t, 1024> recv_buf;
try
{
_sock.receive_from( (char*)recv_buf.data(), recv_buf.size(), from );
_sock.receive_from( receive_buffer, receive_buffer_size, from );
} FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket");
uint64_t receive_timestamp_net_order = recv_buf[4];

View file

@ -38,7 +38,8 @@ namespace fc
{
public:
boost::asio::ip::tcp::socket& socket;
const char* buffer;
const char* raw_buffer;
std::shared_ptr<const char> shared_buffer;
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
const char* buffer,
@ -46,20 +47,36 @@ namespace fc
promise<size_t>::ptr completion_promise) :
rate_limited_operation(length, std::move(completion_promise)),
socket(socket),
buffer(buffer)
raw_buffer(buffer)
{}
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
const std::shared_ptr<const char>& buffer,
size_t length,
promise<size_t>::ptr completion_promise) :
rate_limited_operation(length, std::move(completion_promise)),
socket(socket),
raw_buffer(nullptr),
shared_buffer(buffer)
{}
virtual void perform_operation() override
{
asio::async_write_some(socket,
boost::asio::buffer(buffer, permitted_length),
completion_promise);
if (raw_buffer)
asio::async_write_some(socket,
raw_buffer, permitted_length,
completion_promise);
else
asio::async_write_some(socket,
shared_buffer, permitted_length,
completion_promise);
}
};
class rate_limited_tcp_read_operation : public rate_limited_operation
{
public:
boost::asio::ip::tcp::socket& socket;
char* buffer;
char* raw_buffer;
std::shared_ptr<char> shared_buffer;
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
char* buffer,
@ -67,13 +84,28 @@ namespace fc
promise<size_t>::ptr completion_promise) :
rate_limited_operation(length, std::move(completion_promise)),
socket(socket),
buffer(buffer)
raw_buffer(buffer)
{}
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
const std::shared_ptr<char>& buffer,
size_t length,
promise<size_t>::ptr completion_promise) :
rate_limited_operation(length, std::move(completion_promise)),
socket(socket),
raw_buffer(nullptr),
shared_buffer(buffer)
{}
virtual void perform_operation() override
{
asio::async_read_some(socket,
boost::asio::buffer(buffer, permitted_length),
completion_promise);
if (raw_buffer)
asio::async_read_some(socket,
raw_buffer, permitted_length,
completion_promise);
else
asio::async_read_some(socket,
shared_buffer, permitted_length,
completion_promise);
}
};
@ -177,7 +209,13 @@ namespace fc
uint32_t burstiness_in_seconds = 1);
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length) override;
template <typename BufferType>
size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length);
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length) override;
template <typename BufferType>
size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length);
void process_pending_reads();
void process_pending_writes();
@ -202,7 +240,18 @@ namespace fc
{
}
size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length)
{
return readsome_impl(socket, buffer, length);
}
size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
{
return readsome_impl(socket, buffer, length);
}
template <typename BufferType>
size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length)
{
size_t bytes_read;
if (_download_bytes_per_second)
@ -217,17 +266,38 @@ namespace fc
else if (_new_read_operation_available_promise)
_new_read_operation_available_promise->set_value();
bytes_read = completion_promise->wait();
try
{
bytes_read = completion_promise->wait();
}
catch (...)
{
_read_operations_for_next_iteration.remove(&read_operation);
_read_operations_in_progress.remove(&read_operation);
throw;
}
_unused_read_tokens += read_operation.permitted_length - bytes_read;
}
else
bytes_read = asio::read_some(socket, boost::asio::buffer(buffer, length));
bytes_read = asio::read_some(socket, buffer, length);
_actual_download_rate.update(bytes_read);
return bytes_read;
}
size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
{
return writesome_impl(socket, buffer, length);
}
size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length)
{
return writesome_impl(socket, buffer, length);
}
template <typename BufferType>
size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length)
{
size_t bytes_written;
if (_upload_bytes_per_second)
@ -242,16 +312,26 @@ namespace fc
else if (_new_write_operation_available_promise)
_new_write_operation_available_promise->set_value();
bytes_written = completion_promise->wait();
try
{
bytes_written = completion_promise->wait();
}
catch (...)
{
_write_operations_for_next_iteration.remove(&write_operation);
_write_operations_in_progress.remove(&write_operation);
throw;
}
_unused_write_tokens += write_operation.permitted_length - bytes_written;
}
else
bytes_written = asio::write_some(socket, boost::asio::buffer(buffer, length));
bytes_written = asio::write_some(socket, buffer, length);
_actual_upload_rate.update(bytes_written);
return bytes_written;
}
void rate_limiting_group_impl::process_pending_reads()
{
for (;;)
@ -348,8 +428,9 @@ namespace fc
{
if ((*iter)->permitted_length > 0)
{
(*iter)->perform_operation();
rate_limited_operation* operation_to_perform = *iter;
iter = operations_in_progress.erase(iter);
operation_to_perform->perform_operation();
}
else
++iter;
@ -362,13 +443,13 @@ namespace fc
// the operation immediately without being queued up. This should only be hit if
// we change from a limited rate to unlimited
for (auto iter = operations_in_progress.begin();
iter != operations_in_progress.end();
++iter)
iter != operations_in_progress.end();)
{
(*iter)->permitted_length = (*iter)->length;
(*iter)->perform_operation();
rate_limited_operation* operation_to_perform = *iter;
iter = operations_in_progress.erase(iter);
operation_to_perform->permitted_length = operation_to_perform->length;
operation_to_perform->perform_operation();
}
operations_in_progress.clear();
}
last_iteration_start_time = this_iteration_start_time;
}

View file

@ -13,7 +13,7 @@
namespace fc {
class tcp_socket::impl : public tcp_socket_io_hooks{
class tcp_socket::impl : public tcp_socket_io_hooks {
public:
impl() :
_sock(fc::asio::default_io_service()),
@ -23,11 +23,27 @@ namespace fc {
{
if( _sock.is_open() )
_sock.close();
if( _read_in_progress.valid() ) try { _read_in_progress.wait(); } catch ( ... ) {}
if( _write_in_progress.valid() ) try { _write_in_progress.wait(); } catch ( ... ) {}
if( _read_in_progress.valid() )
try
{
_read_in_progress.wait();
}
catch ( ... )
{
}
if( _write_in_progress.valid() )
try
{
_write_in_progress.wait();
}
catch ( ... )
{
}
}
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length) override;
fc::future<size_t> _write_in_progress;
fc::future<size_t> _read_in_progress;
@ -37,11 +53,19 @@ namespace fc {
size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
{
return (_read_in_progress = fc::asio::read_some(socket, boost::asio::buffer(buffer, length))).wait();
return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait();
}
size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length)
{
return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait();
}
size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
{
return (_write_in_progress = fc::asio::write_some(socket, boost::asio::buffer(buffer, length))).wait();
return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait();
}
size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length)
{
return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait();
}
@ -72,7 +96,13 @@ namespace fc {
return !my->_sock.is_open();
}
size_t tcp_socket::writesome(const char* buf, size_t len) {
size_t tcp_socket::writesome(const char* buf, size_t len)
{
return my->_io_hooks->writesome(my->_sock, buf, len);
}
size_t tcp_socket::writesome(const std::shared_ptr<const char>& buf, size_t len)
{
return my->_io_hooks->writesome(my->_sock, buf, len);
}
@ -97,7 +127,12 @@ namespace fc {
FC_RETHROW_EXCEPTIONS( warn, "error getting socket's local endpoint" );
}
size_t tcp_socket::readsome( char* buf, size_t len ) {
size_t tcp_socket::readsome( char* buf, size_t len )
{
return my->_io_hooks->readsome(my->_sock, buf, len);
}
size_t tcp_socket::readsome( const std::shared_ptr<char>& buf, size_t len ) {
return my->_io_hooks->readsome(my->_sock, buf, len);
}

View file

@ -73,6 +73,32 @@ namespace fc {
void udp_socket::bind( const fc::ip::endpoint& e ) {
my->_sock.bind( to_asio_ep(e) );
}
size_t udp_socket::receive_from( std::shared_ptr<char> receive_buffer, size_t receive_buffer_length, fc::ip::endpoint& from )
{
try
{
boost::asio::ip::udp::endpoint boost_from_endpoint;
size_t bytes_read = my->_sock.receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length), boost_from_endpoint );
from = to_fc_ep(boost_from_endpoint);
return bytes_read;
}
catch( const boost::system::system_error& e )
{
if( e.code() != boost::asio::error::would_block )
throw;
}
boost::asio::ip::udp::endpoint boost_from_endpoint;
promise<size_t>::ptr completion_promise(new promise<size_t>("udp_socket::receive_from"));
my->_sock.async_receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length),
boost_from_endpoint,
asio::detail::read_write_handler_with_buffer(completion_promise, receive_buffer) );
size_t bytes_read = completion_promise->wait();
from = to_fc_ep(boost_from_endpoint);
return bytes_read;
}
size_t udp_socket::receive_from( char* receive_buffer, size_t receive_buffer_length, fc::ip::endpoint& from )
{
try
@ -91,13 +117,7 @@ namespace fc {
boost::asio::ip::udp::endpoint boost_from_endpoint;
promise<size_t>::ptr completion_promise(new promise<size_t>("udp_socket::receive_from"));
my->_sock.async_receive_from( boost::asio::buffer(receive_buffer, receive_buffer_length), boost_from_endpoint,
[=]( const boost::system::error_code& ec, size_t bytes_transferred ) {
if( !ec )
completion_promise->set_value(bytes_transferred);
else
completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ",
("message", boost::system::system_error(ec).what())) ) ) );
});
asio::detail::read_write_handler(completion_promise) );
size_t bytes_read = completion_promise->wait();
from = to_fc_ep(boost_from_endpoint);
return bytes_read;