Fix more locations where we were making boost::asio calls with buffers declared on the stack which could cause problems when the calling tasks were canceled.

This commit is contained in:
Eric Frias 2014-09-11 16:30:03 -04:00
parent 55e7a073cf
commit 751777e754
18 changed files with 224 additions and 76 deletions

View file

@ -310,7 +310,7 @@ if(WIN32)
endif()
endforeach()
message("openssl_libraries=${OPENSSL_LIBRARIES}")
# message(STATUS "openssl_libraries=${OPENSSL_LIBRARIES}")
foreach(lib ${OPENSSL_LIBRARIES})
get_filename_component(lib_name ${lib} NAME_WE)
if (${lib_name} STREQUAL "libeay32")

View file

@ -28,12 +28,14 @@ namespace asio {
promise<size_t>::ptr _completion_promise;
};
class read_write_handler_with_buffer : public read_write_handler
class read_write_handler_with_buffer
{
public:
read_write_handler_with_buffer(const promise<size_t>::ptr& p,
const std::shared_ptr<const char>& buffer);
void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
private:
promise<size_t>::ptr _completion_promise;
std::shared_ptr<const char> _buffer;
};
@ -105,19 +107,19 @@ namespace asio {
}
template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length)
future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(boost::asio::buffer(buffer, length),
s.async_read_some(boost::asio::buffer(buffer + offset, length),
detail::read_write_handler(completion_promise));
return completion_promise;//->wait();
}
template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length)
future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(boost::asio::buffer(buffer.get(), length),
s.async_read_some(boost::asio::buffer(buffer.get() + offset, length),
detail::read_write_handler_with_buffer(completion_promise, buffer));
return completion_promise;//->wait();
}
@ -137,9 +139,9 @@ namespace asio {
template<typename AsyncReadStream>
void async_read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer,
size_t length, promise<size_t>::ptr completion_promise)
size_t length, size_t offset, promise<size_t>::ptr completion_promise)
{
s.async_read_some(boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(completion_promise, buffer));
s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer));
}
template<typename AsyncReadStream>
@ -175,17 +177,17 @@ namespace asio {
template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const char* buffer,
size_t length ) {
size_t length, size_t offset = 0) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( boost::asio::buffer(buffer, length), detail::read_write_handler(p));
s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p));
return p; //->wait();
}
template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length ) {
size_t length, size_t offset ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(p, buffer));
s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer));
return p; //->wait();
}
@ -208,8 +210,8 @@ namespace asio {
template<typename AsyncWriteStream>
void async_write_some(AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length, promise<size_t>::ptr completion_promise) {
s.async_write_some(boost::asio::buffer(buffer.get(), length),
size_t length, size_t offset, promise<size_t>::ptr completion_promise) {
s.async_write_some(boost::asio::buffer(buffer.get() + offset, length),
detail::read_write_handler_with_buffer(completion_promise, buffer));
}
@ -263,8 +265,11 @@ namespace asio {
virtual size_t readsome( char* buf, size_t len )
{
auto r = fc::asio::read_some(*_stream, boost::asio::buffer(buf, len) ).wait();
return r;
return fc::asio::read_some(*_stream, buf, len).wait();
}
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
{
return fc::asio::read_some(*_stream, buf, len, offset).wait();
}
private:
@ -280,7 +285,12 @@ namespace asio {
virtual size_t writesome( const char* buf, size_t len )
{
return fc::asio::write_some(*_stream, boost::asio::const_buffers_1(buf, len) ).wait();
return fc::asio::write_some(*_stream, buf, len).wait();
}
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
{
return fc::asio::write_some(*_stream, buf, len, offset).wait();
}
virtual void close(){ _stream->close(); }

View file

@ -32,6 +32,7 @@ namespace fc
* @throws fc::eof if at least 1 byte cannot be read
**/
virtual std::size_t readsome( char* buf, std::size_t len );
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset );
/**
* This method may block until at least 1 character is
@ -61,6 +62,7 @@ namespace fc
* is full, in which case it will flush which may block.
*/
virtual size_t writesome( const char* buf, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset );
virtual void close();
virtual void flush();

View file

@ -14,6 +14,7 @@ namespace fc {
void open( const fc::path& file, int m = binary );
size_t writesome( const char* buf, size_t len );
size_t writesome(const std::shared_ptr<const char>& buffer, size_t len, size_t offset);
void put( char c );
void close();
void flush();
@ -34,6 +35,7 @@ namespace fc {
void open( const fc::path& file, int m );
size_t readsome( char* buf, size_t len );
size_t readsome(const std::shared_ptr<char>& buffer, size_t max, size_t offset);
ifstream& read( char* buf, size_t len );
ifstream& seekg( size_t p, seekdir d = beg );
void get( char& c ) { read( &c, 1 ); }

View file

@ -21,6 +21,7 @@ namespace fc {
* @throws fc::eof if at least 1 byte cannot be read
**/
virtual size_t readsome( char* buf, size_t len ) = 0;
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset ) = 0;
/** read len bytes or throw, this method is implemented
* in terms of readsome.
@ -28,7 +29,7 @@ namespace fc {
* @throws fc::eof_exception if len bytes cannot be read
**/
istream& read( char* buf, size_t len );
istream& read( const std::shared_ptr<char>& buf, size_t len );
istream& read( const std::shared_ptr<char>& buf, size_t len, size_t offset = 0 );
char get();
};
typedef std::shared_ptr<istream> istream_ptr;
@ -42,6 +43,7 @@ namespace fc {
public:
virtual ~ostream(){};
virtual size_t writesome( const char* buf, size_t len ) = 0;
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset ) = 0;
virtual void close() = 0;
virtual void flush() = 0;
@ -51,7 +53,7 @@ namespace fc {
* but not flushed.
**/
ostream& write( const char* buf, size_t len );
ostream& write( const std::shared_ptr<const char>& buf, size_t len );
ostream& write( const std::shared_ptr<const char>& buf, size_t len, size_t offset = 0 );
};
typedef std::shared_ptr<ostream> ostream_ptr;

View file

@ -18,7 +18,9 @@ namespace fc {
virtual bool eof()const;
virtual size_t writesome( const char* buf, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset );
virtual size_t readsome( char* buf, size_t len );
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset );
virtual void close();
virtual void flush();
char peek();

View file

@ -7,6 +7,7 @@ namespace fc
class cout_t : virtual public ostream {
public:
virtual size_t writesome( const char* buf, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset );
virtual void close();
virtual void flush();
};
@ -14,6 +15,7 @@ namespace fc
class cerr_t : virtual public ostream {
public:
virtual size_t writesome( const char* buf, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset );
virtual void close();
virtual void flush();
};
@ -22,6 +24,7 @@ namespace fc
public:
~cin_t();
virtual size_t readsome( char* buf, size_t len );
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset );
virtual istream& read( char* buf, size_t len );
virtual bool eof()const;
};

View file

@ -32,14 +32,14 @@ namespace fc {
/// istream interface
/// @{
virtual size_t readsome( char* buffer, size_t max );
virtual size_t readsome( const std::shared_ptr<char>& buffer, size_t max );
virtual size_t readsome(const std::shared_ptr<char>& buffer, size_t max, size_t offset);
virtual bool eof()const;
/// @}
/// ostream interface
/// @{
virtual size_t writesome( const char* buffer, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buffer, size_t len );
virtual size_t writesome(const std::shared_ptr<const char>& buffer, size_t len, size_t offset);
virtual void flush();
virtual void close();
/// @}

View file

@ -8,8 +8,8 @@ namespace fc
public:
virtual ~tcp_socket_io_hooks() {}
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) = 0;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length) = 0;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset) = 0;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) = 0;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length) = 0;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset) = 0;
};
} // namesapce fc

View file

@ -29,12 +29,14 @@ namespace fc {
/// istream interface
/// @{
virtual size_t readsome( char* buffer, size_t max );
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset );
virtual bool eof()const;
/// @}
/// ostream interface
/// @{
virtual size_t writesome( const char* buffer, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset );
virtual void flush();
virtual void close();
/// @}

View file

@ -9,9 +9,12 @@ namespace fc {
read_write_handler::read_write_handler(const promise<size_t>::ptr& completion_promise) :
_completion_promise(completion_promise)
{}
{
// assert(false); // to detect anywhere we're not passing in a shared buffer
}
void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
{
// assert(false); // to detect anywhere we're not passing in a shared buffer
if( !ec )
_completion_promise->set_value(bytes_transferred);
else if( ec == boost::asio::error::eof )
@ -21,10 +24,18 @@ namespace fc {
}
read_write_handler_with_buffer::read_write_handler_with_buffer(const promise<size_t>::ptr& completion_promise,
const std::shared_ptr<const char>& buffer) :
read_write_handler(completion_promise),
_completion_promise(completion_promise),
_buffer(buffer)
{}
void read_write_handler_with_buffer::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
{
if( !ec )
_completion_promise->set_value(bytes_transferred);
else if( ec == boost::asio::error::eof )
_completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
else
_completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
void read_write_handler_ec( promise<size_t>* p, boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ) {
p->set_value(bytes_transferred);

View file

@ -12,11 +12,19 @@ namespace fc
class buffered_istream_impl
{
public:
buffered_istream_impl( istream_ptr is )
:_istr(fc::move(is)){}
buffered_istream_impl( istream_ptr is ) :
_istr(fc::move(is))
#ifndef NDEBUG
,_shared_read_buffer_in_use(false)
#endif
{}
istream_ptr _istr;
boost::asio::streambuf _rdbuf;
std::shared_ptr<char> _shared_read_buffer;
#ifndef NDEBUG
bool _shared_read_buffer_in_use;
#endif
};
static const size_t minimum_read_size = 1024;
}
@ -64,6 +72,43 @@ namespace fc
return bytes_to_deliver_immediately;
}
size_t buffered_istream::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
{
size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(buf.get() + offset, len));
if (bytes_from_rdbuf)
return bytes_from_rdbuf;
if( len > detail::minimum_read_size )
return my->_istr->readsome(buf.get() + offset, len);
#ifndef NDEBUG
// This code was written with the assumption that you'd only be making one call to readsome
// at a time so it reuses _shared_read_buffer. If you really need to make concurrent calls to
// readsome(), you'll need to prevent reusing _shared_read_buffer here
struct check_buffer_in_use {
bool& _buffer_in_use;
check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; }
~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; }
} buffer_in_use_checker(my->_shared_read_buffer_in_use);
#endif
if (!my->_shared_read_buffer)
my->_shared_read_buffer.reset(new char[detail::minimum_read_size], [](char* p){ delete[] p; });
size_t bytes_read = my->_istr->readsome( my->_shared_read_buffer, detail::minimum_read_size, 0 );
size_t bytes_to_deliver_immediately = std::min<size_t>(bytes_read,len);
memcpy( buf.get() + offset, my->_shared_read_buffer.get(), bytes_to_deliver_immediately );
if( bytes_read > len )
{
my->_rdbuf.sputn( my->_shared_read_buffer.get() + len, bytes_read - len );
}
return bytes_to_deliver_immediately;
}
char buffered_istream::peek()const
{
if( my->_rdbuf.size() )
@ -89,11 +134,19 @@ namespace fc
class buffered_ostream_impl
{
public:
buffered_ostream_impl( ostream_ptr os )
:_ostr(fc::move(os)){}
buffered_ostream_impl( ostream_ptr os ) :
_ostr(fc::move(os))
#ifndef NDEBUG
,_shared_write_buffer_in_use(false)
#endif
{}
ostream_ptr _ostr;
boost::asio::streambuf _rdbuf;
std::shared_ptr<char> _shared_write_buffer;
#ifndef NDEBUG
bool _shared_write_buffer_in_use;
#endif
};
}
@ -120,11 +173,29 @@ namespace fc
return written + static_cast<size_t>(my->_rdbuf.sputn( buf+written, len-written ));
}
size_t buffered_ostream::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
{
return writesome(buf.get() + offset, len);
}
void buffered_ostream::flush()
{
char buffer[2048];
while( size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(buffer,sizeof(buffer))) )
my->_ostr->write( buffer, bytes_from_rdbuf );
#ifndef NDEBUG
// This code was written with the assumption that you'd only be making one call to flush
// at a time so it reuses _shared_write_buffer. If you really need to make concurrent calls to
// flush(), you'll need to prevent reusing _shared_write_buffer here
struct check_buffer_in_use {
bool& _buffer_in_use;
check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; }
~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; }
} buffer_in_use_checker(my->_shared_write_buffer_in_use);
#endif
const size_t write_buffer_size = 2048;
if (!my->_shared_write_buffer)
my->_shared_write_buffer.reset(new char[write_buffer_size], [](char* p){ delete[] p; });
while( size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(my->_shared_write_buffer.get(), write_buffer_size)) )
my->_ostr->write( my->_shared_write_buffer, bytes_from_rdbuf );
my->_ostr->flush();
}

View file

@ -31,6 +31,11 @@ namespace fc {
my->ofs.write(buf,len);
return len;
}
size_t ofstream::writesome(const std::shared_ptr<const char>& buffer, size_t len, size_t offset)
{
return writesome(buffer.get() + offset, len);
}
void ofstream::put( char c ) {
my->ofs.put(c);
}
@ -62,6 +67,11 @@ namespace fc {
}
return s;
}
size_t ifstream::readsome(const std::shared_ptr<char>& buffer, size_t max, size_t offset)
{
return readsome(buffer.get() + offset, max);
}
ifstream& ifstream::read( char* buf, size_t len ) {
if( eof() ) FC_THROW_EXCEPTION( eof_exception , "");
my->ifs.read(buf,len);

View file

@ -96,10 +96,12 @@ namespace fc {
size_t cout_t::writesome( const char* buf, size_t len ) { std::cout.write(buf,len); return len; }
size_t cout_t::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset ) { return writesome(buf.get() + offset, len); }
void cout_t::close() {}
void cout_t::flush() { std::cout.flush(); }
size_t cerr_t::writesome( const char* buf, size_t len ) { std::cerr.write(buf,len); return len; }
size_t cerr_t::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset ) { return writesome(buf.get() + offset, len); }
void cerr_t::close() {};
void cerr_t::flush() { std::cerr.flush(); }
@ -127,6 +129,7 @@ namespace fc {
}
return size_t(u);
}
size_t cin_t::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset ) { return readsome(buf.get() + offset, len); }
cin_t::~cin_t() {
/*
@ -358,12 +361,12 @@ namespace fc {
return *this;
}
istream& istream::read( const std::shared_ptr<char>& buf, size_t len )
istream& istream::read( const std::shared_ptr<char>& buf, size_t len, size_t offset )
{
char* pos = buf.get();
while( size_t(pos-buf.get()) < len )
pos += readsome( pos, len - (pos - buf.get()) );
return *this;
size_t bytes_read = 0;
while( bytes_read < len )
bytes_read += readsome(buf, len - bytes_read, bytes_read + offset);
return *this;
}
ostream& ostream::write( const char* buf, size_t len )
@ -374,12 +377,12 @@ namespace fc {
return *this;
}
ostream& ostream::write( const std::shared_ptr<const char>& buf, size_t len )
ostream& ostream::write( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
{
const char* pos = buf.get();
while( size_t(pos-buf.get()) < len )
pos += writesome( pos, len - (pos - buf.get()) );
return *this;
size_t bytes_written = 0;
while( bytes_written < len )
bytes_written += writesome(buf, len - bytes_written, bytes_written + offset);
return *this;
}
} // namespace fc

View file

@ -54,6 +54,11 @@ namespace fc {
}
return len;
}
size_t stringstream::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
{
return writesome(buf.get() + offset, len);
}
size_t stringstream::readsome( char* buf, size_t len ) {
size_t r = static_cast<size_t>(my->ss.readsome(buf,len));
if( my->ss.eof() || r == 0 )
@ -62,6 +67,12 @@ namespace fc {
}
return r;
}
size_t stringstream::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
{
return readsome(buf.get() + offset, len);
}
void stringstream::close(){ my->ss.flush(); };
void stringstream::flush(){ my->ss.flush(); };

View file

@ -21,12 +21,15 @@ namespace fc
{
public:
size_t length;
size_t offset;
size_t permitted_length;
promise<size_t>::ptr completion_promise;
rate_limited_operation(size_t length,
size_t offset,
promise<size_t>::ptr&& completion_promise) :
length(length),
offset(offset),
permitted_length(0),
completion_promise(completion_promise)
{}
@ -44,16 +47,20 @@ namespace fc
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
const char* buffer,
size_t length,
size_t offset,
promise<size_t>::ptr completion_promise) :
rate_limited_operation(length, std::move(completion_promise)),
rate_limited_operation(length, offset, std::move(completion_promise)),
socket(socket),
raw_buffer(buffer)
{}
{
assert(false);
}
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
const std::shared_ptr<const char>& buffer,
size_t length,
size_t offset,
promise<size_t>::ptr completion_promise) :
rate_limited_operation(length, std::move(completion_promise)),
rate_limited_operation(length, offset, std::move(completion_promise)),
socket(socket),
raw_buffer(nullptr),
shared_buffer(buffer)
@ -66,7 +73,7 @@ namespace fc
completion_promise);
else
asio::async_write_some(socket,
shared_buffer, permitted_length,
shared_buffer, permitted_length, offset,
completion_promise);
}
};
@ -81,16 +88,18 @@ namespace fc
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
char* buffer,
size_t length,
size_t offset,
promise<size_t>::ptr completion_promise) :
rate_limited_operation(length, std::move(completion_promise)),
rate_limited_operation(length, offset, std::move(completion_promise)),
socket(socket),
raw_buffer(buffer)
{}
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
const std::shared_ptr<char>& buffer,
size_t length,
size_t offset,
promise<size_t>::ptr completion_promise) :
rate_limited_operation(length, std::move(completion_promise)),
rate_limited_operation(length, offset, std::move(completion_promise)),
socket(socket),
raw_buffer(nullptr),
shared_buffer(buffer)
@ -103,7 +112,7 @@ namespace fc
completion_promise);
else
asio::async_read_some(socket,
shared_buffer, permitted_length,
shared_buffer, permitted_length, offset,
completion_promise);
}
@ -209,13 +218,13 @@ namespace fc
uint32_t burstiness_in_seconds = 1);
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length) override;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset) override;
template <typename BufferType>
size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length);
size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset);
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset) override;
template <typename BufferType>
size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length);
size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset);
void process_pending_reads();
void process_pending_writes();
@ -240,24 +249,24 @@ namespace fc
{
}
size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length)
size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
{
return readsome_impl(socket, buffer, length);
return readsome_impl(socket, buffer, length, offset);
}
size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
{
return readsome_impl(socket, buffer, length);
return readsome_impl(socket, buffer, length, 0);
}
template <typename BufferType>
size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length)
size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset)
{
size_t bytes_read;
if (_download_bytes_per_second)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::readsome"));
rate_limited_tcp_read_operation read_operation(socket, buffer, length, completion_promise);
rate_limited_tcp_read_operation read_operation(socket, buffer, length, offset, completion_promise);
_read_operations_for_next_iteration.push_back(&read_operation);
// launch the read processing loop it if isn't running, or signal it to resume if it's paused.
@ -279,7 +288,7 @@ namespace fc
_unused_read_tokens += read_operation.permitted_length - bytes_read;
}
else
bytes_read = asio::read_some(socket, buffer, length);
bytes_read = asio::read_some(socket, buffer, length, offset);
_actual_download_rate.update(bytes_read);
@ -288,22 +297,22 @@ namespace fc
size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
{
return writesome_impl(socket, buffer, length);
return writesome_impl(socket, buffer, length, 0);
}
size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length)
size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset)
{
return writesome_impl(socket, buffer, length);
return writesome_impl(socket, buffer, length, offset);
}
template <typename BufferType>
size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length)
size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset)
{
size_t bytes_written;
if (_upload_bytes_per_second)
{
promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::writesome"));
rate_limited_tcp_write_operation write_operation(socket, buffer, length, completion_promise);
rate_limited_tcp_write_operation write_operation(socket, buffer, length, offset, completion_promise);
_write_operations_for_next_iteration.push_back(&write_operation);
// launch the write processing loop it if isn't running, or signal it to resume if it's paused.
@ -325,7 +334,7 @@ namespace fc
_unused_write_tokens += write_operation.permitted_length - bytes_written;
}
else
bytes_written = asio::write_some(socket, buffer, length);
bytes_written = asio::write_some(socket, buffer, length, offset);
_actual_upload_rate.update(bytes_written);

View file

@ -41,9 +41,9 @@ namespace fc {
}
}
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length) override;
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset) override;
fc::future<size_t> _write_in_progress;
fc::future<size_t> _read_in_progress;
@ -55,17 +55,17 @@ namespace fc {
{
return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait();
}
size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length)
size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
{
return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait();
return (_read_in_progress = fc::asio::read_some(socket, buffer, length, offset)).wait();
}
size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
{
return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait();
}
size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length)
size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset)
{
return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait();
return (_write_in_progress = fc::asio::write_some(socket, buffer, length, offset)).wait();
}
@ -101,9 +101,9 @@ namespace fc {
return my->_io_hooks->writesome(my->_sock, buf, len);
}
size_t tcp_socket::writesome(const std::shared_ptr<const char>& buf, size_t len)
size_t tcp_socket::writesome(const std::shared_ptr<const char>& buf, size_t len, size_t offset)
{
return my->_io_hooks->writesome(my->_sock, buf, len);
return my->_io_hooks->writesome(my->_sock, buf, len, offset);
}
fc::ip::endpoint tcp_socket::remote_endpoint()const
@ -132,8 +132,8 @@ namespace fc {
return my->_io_hooks->readsome(my->_sock, buf, len);
}
size_t tcp_socket::readsome( const std::shared_ptr<char>& buf, size_t len ) {
return my->_io_hooks->readsome(my->_sock, buf, len);
size_t tcp_socket::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset ) {
return my->_io_hooks->readsome(my->_sock, buf, len, offset);
}
void tcp_socket::connect_to( const fc::ip::endpoint& remote_endpoint ) {

View file

@ -244,6 +244,11 @@ namespace fc {
return bytes_read;
} FC_CAPTURE_AND_RETHROW( (max) ) }
size_t udt_socket::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
{
return readsome(buf.get() + offset, len);
}
bool udt_socket::eof()const
{
// TODO...
@ -274,6 +279,11 @@ namespace fc {
return bytes_sent;
} FC_CAPTURE_AND_RETHROW( (len) ) }
size_t udt_socket::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
{
return writesome(buf.get() + offset, len);
}
void udt_socket::flush(){}
void udt_socket::close()