diff --git a/CMakeLists.txt b/CMakeLists.txt index 7a93881..3574a29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -310,7 +310,7 @@ if(WIN32) endif() endforeach() - message("openssl_libraries=${OPENSSL_LIBRARIES}") + # message(STATUS "openssl_libraries=${OPENSSL_LIBRARIES}") foreach(lib ${OPENSSL_LIBRARIES}) get_filename_component(lib_name ${lib} NAME_WE) if (${lib_name} STREQUAL "libeay32") diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index 4eac5da..9310774 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -28,12 +28,14 @@ namespace asio { promise::ptr _completion_promise; }; - class read_write_handler_with_buffer : public read_write_handler + class read_write_handler_with_buffer { public: read_write_handler_with_buffer(const promise::ptr& p, const std::shared_ptr& buffer); + void operator()(const boost::system::error_code& ec, size_t bytes_transferred); private: + promise::ptr _completion_promise; std::shared_ptr _buffer; }; @@ -105,19 +107,19 @@ namespace asio { } template - future read_some(AsyncReadStream& s, char* buffer, size_t length) + future read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0) { promise::ptr completion_promise(new promise("fc::asio::async_read_some")); - s.async_read_some(boost::asio::buffer(buffer, length), + s.async_read_some(boost::asio::buffer(buffer + offset, length), detail::read_write_handler(completion_promise)); return completion_promise;//->wait(); } template - future read_some(AsyncReadStream& s, const std::shared_ptr& buffer, size_t length) + future read_some(AsyncReadStream& s, const std::shared_ptr& buffer, size_t length, size_t offset) { promise::ptr completion_promise(new promise("fc::asio::async_read_some")); - s.async_read_some(boost::asio::buffer(buffer.get(), length), + s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer)); return completion_promise;//->wait(); } @@ -137,9 +139,9 @@ namespace asio { template void async_read_some(AsyncReadStream& s, const std::shared_ptr& buffer, - size_t length, promise::ptr completion_promise) + size_t length, size_t offset, promise::ptr completion_promise) { - s.async_read_some(boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(completion_promise, buffer)); + s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer)); } template @@ -175,17 +177,17 @@ namespace asio { template future write_some( AsyncWriteStream& s, const char* buffer, - size_t length ) { + size_t length, size_t offset = 0) { promise::ptr p(new promise("fc::asio::write_some")); - s.async_write_some( boost::asio::buffer(buffer, length), detail::read_write_handler(p)); + s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p)); return p; //->wait(); } template future write_some( AsyncWriteStream& s, const std::shared_ptr& buffer, - size_t length ) { + size_t length, size_t offset ) { promise::ptr p(new promise("fc::asio::write_some")); - s.async_write_some( boost::asio::buffer(buffer.get(), length), detail::read_write_handler_with_buffer(p, buffer)); + s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer)); return p; //->wait(); } @@ -208,8 +210,8 @@ namespace asio { template void async_write_some(AsyncWriteStream& s, const std::shared_ptr& buffer, - size_t length, promise::ptr completion_promise) { - s.async_write_some(boost::asio::buffer(buffer.get(), length), + size_t length, size_t offset, promise::ptr completion_promise) { + s.async_write_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer)); } @@ -263,8 +265,11 @@ namespace asio { virtual size_t readsome( char* buf, size_t len ) { - auto r = fc::asio::read_some(*_stream, boost::asio::buffer(buf, len) ).wait(); - return r; + return fc::asio::read_some(*_stream, buf, len).wait(); + } + virtual size_t readsome( const std::shared_ptr& buf, size_t len, size_t offset ) + { + return fc::asio::read_some(*_stream, buf, len, offset).wait(); } private: @@ -280,7 +285,12 @@ namespace asio { virtual size_t writesome( const char* buf, size_t len ) { - return fc::asio::write_some(*_stream, boost::asio::const_buffers_1(buf, len) ).wait(); + return fc::asio::write_some(*_stream, buf, len).wait(); + } + + virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ) + { + return fc::asio::write_some(*_stream, buf, len, offset).wait(); } virtual void close(){ _stream->close(); } diff --git a/include/fc/io/buffered_iostream.hpp b/include/fc/io/buffered_iostream.hpp index 5ebb566..f1803eb 100644 --- a/include/fc/io/buffered_iostream.hpp +++ b/include/fc/io/buffered_iostream.hpp @@ -32,6 +32,7 @@ namespace fc * @throws fc::eof if at least 1 byte cannot be read **/ virtual std::size_t readsome( char* buf, std::size_t len ); + virtual size_t readsome( const std::shared_ptr& buf, size_t len, size_t offset ); /** * This method may block until at least 1 character is @@ -61,6 +62,7 @@ namespace fc * is full, in which case it will flush which may block. */ virtual size_t writesome( const char* buf, size_t len ); + virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ); virtual void close(); virtual void flush(); diff --git a/include/fc/io/fstream.hpp b/include/fc/io/fstream.hpp index a11f915..6389c79 100644 --- a/include/fc/io/fstream.hpp +++ b/include/fc/io/fstream.hpp @@ -14,6 +14,7 @@ namespace fc { void open( const fc::path& file, int m = binary ); size_t writesome( const char* buf, size_t len ); + size_t writesome(const std::shared_ptr& buffer, size_t len, size_t offset); void put( char c ); void close(); void flush(); @@ -34,6 +35,7 @@ namespace fc { void open( const fc::path& file, int m ); size_t readsome( char* buf, size_t len ); + size_t readsome(const std::shared_ptr& buffer, size_t max, size_t offset); ifstream& read( char* buf, size_t len ); ifstream& seekg( size_t p, seekdir d = beg ); void get( char& c ) { read( &c, 1 ); } diff --git a/include/fc/io/iostream.hpp b/include/fc/io/iostream.hpp index f743048..582797f 100644 --- a/include/fc/io/iostream.hpp +++ b/include/fc/io/iostream.hpp @@ -21,6 +21,7 @@ namespace fc { * @throws fc::eof if at least 1 byte cannot be read **/ virtual size_t readsome( char* buf, size_t len ) = 0; + virtual size_t readsome( const std::shared_ptr& buf, size_t len, size_t offset ) = 0; /** read len bytes or throw, this method is implemented * in terms of readsome. @@ -28,7 +29,7 @@ namespace fc { * @throws fc::eof_exception if len bytes cannot be read **/ istream& read( char* buf, size_t len ); - istream& read( const std::shared_ptr& buf, size_t len ); + istream& read( const std::shared_ptr& buf, size_t len, size_t offset = 0 ); char get(); }; typedef std::shared_ptr istream_ptr; @@ -42,6 +43,7 @@ namespace fc { public: virtual ~ostream(){}; virtual size_t writesome( const char* buf, size_t len ) = 0; + virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ) = 0; virtual void close() = 0; virtual void flush() = 0; @@ -51,7 +53,7 @@ namespace fc { * but not flushed. **/ ostream& write( const char* buf, size_t len ); - ostream& write( const std::shared_ptr& buf, size_t len ); + ostream& write( const std::shared_ptr& buf, size_t len, size_t offset = 0 ); }; typedef std::shared_ptr ostream_ptr; diff --git a/include/fc/io/sstream.hpp b/include/fc/io/sstream.hpp index 0aacb7a..9010bfe 100644 --- a/include/fc/io/sstream.hpp +++ b/include/fc/io/sstream.hpp @@ -18,7 +18,9 @@ namespace fc { virtual bool eof()const; virtual size_t writesome( const char* buf, size_t len ); + virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ); virtual size_t readsome( char* buf, size_t len ); + virtual size_t readsome( const std::shared_ptr& buf, size_t len, size_t offset ); virtual void close(); virtual void flush(); char peek(); diff --git a/include/fc/io/stdio.hpp b/include/fc/io/stdio.hpp index 63d6e17..36ba091 100644 --- a/include/fc/io/stdio.hpp +++ b/include/fc/io/stdio.hpp @@ -7,6 +7,7 @@ namespace fc class cout_t : virtual public ostream { public: virtual size_t writesome( const char* buf, size_t len ); + virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ); virtual void close(); virtual void flush(); }; @@ -14,6 +15,7 @@ namespace fc class cerr_t : virtual public ostream { public: virtual size_t writesome( const char* buf, size_t len ); + virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ); virtual void close(); virtual void flush(); }; @@ -22,6 +24,7 @@ namespace fc public: ~cin_t(); virtual size_t readsome( char* buf, size_t len ); + virtual size_t readsome( const std::shared_ptr& buf, size_t len, size_t offset ); virtual istream& read( char* buf, size_t len ); virtual bool eof()const; }; diff --git a/include/fc/network/tcp_socket.hpp b/include/fc/network/tcp_socket.hpp index 33ca221..cc8c158 100644 --- a/include/fc/network/tcp_socket.hpp +++ b/include/fc/network/tcp_socket.hpp @@ -32,14 +32,14 @@ namespace fc { /// istream interface /// @{ virtual size_t readsome( char* buffer, size_t max ); - virtual size_t readsome( const std::shared_ptr& buffer, size_t max ); + virtual size_t readsome(const std::shared_ptr& buffer, size_t max, size_t offset); virtual bool eof()const; /// @} /// ostream interface /// @{ virtual size_t writesome( const char* buffer, size_t len ); - virtual size_t writesome( const std::shared_ptr& buffer, size_t len ); + virtual size_t writesome(const std::shared_ptr& buffer, size_t len, size_t offset); virtual void flush(); virtual void close(); /// @} diff --git a/include/fc/network/tcp_socket_io_hooks.hpp b/include/fc/network/tcp_socket_io_hooks.hpp index e3d98dd..0d373a7 100644 --- a/include/fc/network/tcp_socket_io_hooks.hpp +++ b/include/fc/network/tcp_socket_io_hooks.hpp @@ -8,8 +8,8 @@ namespace fc public: virtual ~tcp_socket_io_hooks() {} virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) = 0; - virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) = 0; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) = 0; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) = 0; - virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) = 0; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) = 0; }; } // namesapce fc diff --git a/include/fc/network/udt_socket.hpp b/include/fc/network/udt_socket.hpp index aa1dd49..ea17f77 100644 --- a/include/fc/network/udt_socket.hpp +++ b/include/fc/network/udt_socket.hpp @@ -29,12 +29,14 @@ namespace fc { /// istream interface /// @{ virtual size_t readsome( char* buffer, size_t max ); + virtual size_t readsome( const std::shared_ptr& buf, size_t len, size_t offset ); virtual bool eof()const; /// @} /// ostream interface /// @{ virtual size_t writesome( const char* buffer, size_t len ); + virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ); virtual void flush(); virtual void close(); /// @} diff --git a/src/asio.cpp b/src/asio.cpp index 48e2855..8f6bff0 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -9,9 +9,12 @@ namespace fc { read_write_handler::read_write_handler(const promise::ptr& completion_promise) : _completion_promise(completion_promise) - {} + { + // assert(false); // to detect anywhere we're not passing in a shared buffer + } void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred) { + // assert(false); // to detect anywhere we're not passing in a shared buffer if( !ec ) _completion_promise->set_value(bytes_transferred); else if( ec == boost::asio::error::eof ) @@ -21,10 +24,18 @@ namespace fc { } read_write_handler_with_buffer::read_write_handler_with_buffer(const promise::ptr& completion_promise, const std::shared_ptr& buffer) : - read_write_handler(completion_promise), + _completion_promise(completion_promise), _buffer(buffer) {} - + void read_write_handler_with_buffer::operator()(const boost::system::error_code& ec, size_t bytes_transferred) + { + if( !ec ) + _completion_promise->set_value(bytes_transferred); + else if( ec == boost::asio::error::eof ) + _completion_promise->set_exception( fc::exception_ptr( new fc::eof_exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + else + _completion_promise->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); + } void read_write_handler_ec( promise* p, boost::system::error_code* oec, const boost::system::error_code& ec, size_t bytes_transferred ) { p->set_value(bytes_transferred); diff --git a/src/io/buffered_iostream.cpp b/src/io/buffered_iostream.cpp index d6cf1f9..0760621 100644 --- a/src/io/buffered_iostream.cpp +++ b/src/io/buffered_iostream.cpp @@ -12,11 +12,19 @@ namespace fc class buffered_istream_impl { public: - buffered_istream_impl( istream_ptr is ) - :_istr(fc::move(is)){} + buffered_istream_impl( istream_ptr is ) : + _istr(fc::move(is)) +#ifndef NDEBUG + ,_shared_read_buffer_in_use(false) +#endif + {} istream_ptr _istr; boost::asio::streambuf _rdbuf; + std::shared_ptr _shared_read_buffer; +#ifndef NDEBUG + bool _shared_read_buffer_in_use; +#endif }; static const size_t minimum_read_size = 1024; } @@ -64,6 +72,43 @@ namespace fc return bytes_to_deliver_immediately; } + size_t buffered_istream::readsome( const std::shared_ptr& buf, size_t len, size_t offset ) + { + size_t bytes_from_rdbuf = static_cast(my->_rdbuf.sgetn(buf.get() + offset, len)); + if (bytes_from_rdbuf) + return bytes_from_rdbuf; + + + if( len > detail::minimum_read_size ) + return my->_istr->readsome(buf.get() + offset, len); + +#ifndef NDEBUG + // This code was written with the assumption that you'd only be making one call to readsome + // at a time so it reuses _shared_read_buffer. If you really need to make concurrent calls to + // readsome(), you'll need to prevent reusing _shared_read_buffer here + struct check_buffer_in_use { + bool& _buffer_in_use; + check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; } + ~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; } + } buffer_in_use_checker(my->_shared_read_buffer_in_use); +#endif + + if (!my->_shared_read_buffer) + my->_shared_read_buffer.reset(new char[detail::minimum_read_size], [](char* p){ delete[] p; }); + size_t bytes_read = my->_istr->readsome( my->_shared_read_buffer, detail::minimum_read_size, 0 ); + + size_t bytes_to_deliver_immediately = std::min(bytes_read,len); + + memcpy( buf.get() + offset, my->_shared_read_buffer.get(), bytes_to_deliver_immediately ); + + if( bytes_read > len ) + { + my->_rdbuf.sputn( my->_shared_read_buffer.get() + len, bytes_read - len ); + } + + return bytes_to_deliver_immediately; + } + char buffered_istream::peek()const { if( my->_rdbuf.size() ) @@ -89,11 +134,19 @@ namespace fc class buffered_ostream_impl { public: - buffered_ostream_impl( ostream_ptr os ) - :_ostr(fc::move(os)){} + buffered_ostream_impl( ostream_ptr os ) : + _ostr(fc::move(os)) +#ifndef NDEBUG + ,_shared_write_buffer_in_use(false) +#endif + {} ostream_ptr _ostr; boost::asio::streambuf _rdbuf; + std::shared_ptr _shared_write_buffer; +#ifndef NDEBUG + bool _shared_write_buffer_in_use; +#endif }; } @@ -120,11 +173,29 @@ namespace fc return written + static_cast(my->_rdbuf.sputn( buf+written, len-written )); } + size_t buffered_ostream::writesome( const std::shared_ptr& buf, size_t len, size_t offset ) + { + return writesome(buf.get() + offset, len); + } + void buffered_ostream::flush() { - char buffer[2048]; - while( size_t bytes_from_rdbuf = static_cast(my->_rdbuf.sgetn(buffer,sizeof(buffer))) ) - my->_ostr->write( buffer, bytes_from_rdbuf ); +#ifndef NDEBUG + // This code was written with the assumption that you'd only be making one call to flush + // at a time so it reuses _shared_write_buffer. If you really need to make concurrent calls to + // flush(), you'll need to prevent reusing _shared_write_buffer here + struct check_buffer_in_use { + bool& _buffer_in_use; + check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; } + ~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; } + } buffer_in_use_checker(my->_shared_write_buffer_in_use); +#endif + const size_t write_buffer_size = 2048; + if (!my->_shared_write_buffer) + my->_shared_write_buffer.reset(new char[write_buffer_size], [](char* p){ delete[] p; }); + + while( size_t bytes_from_rdbuf = static_cast(my->_rdbuf.sgetn(my->_shared_write_buffer.get(), write_buffer_size)) ) + my->_ostr->write( my->_shared_write_buffer, bytes_from_rdbuf ); my->_ostr->flush(); } diff --git a/src/io/fstream.cpp b/src/io/fstream.cpp index 8107f42..824e44e 100644 --- a/src/io/fstream.cpp +++ b/src/io/fstream.cpp @@ -31,6 +31,11 @@ namespace fc { my->ofs.write(buf,len); return len; } + size_t ofstream::writesome(const std::shared_ptr& buffer, size_t len, size_t offset) + { + return writesome(buffer.get() + offset, len); + } + void ofstream::put( char c ) { my->ofs.put(c); } @@ -62,6 +67,11 @@ namespace fc { } return s; } + size_t ifstream::readsome(const std::shared_ptr& buffer, size_t max, size_t offset) + { + return readsome(buffer.get() + offset, max); + } + ifstream& ifstream::read( char* buf, size_t len ) { if( eof() ) FC_THROW_EXCEPTION( eof_exception , ""); my->ifs.read(buf,len); diff --git a/src/io/iostream.cpp b/src/io/iostream.cpp index ddb370e..02acafb 100644 --- a/src/io/iostream.cpp +++ b/src/io/iostream.cpp @@ -96,10 +96,12 @@ namespace fc { size_t cout_t::writesome( const char* buf, size_t len ) { std::cout.write(buf,len); return len; } + size_t cout_t::writesome( const std::shared_ptr& buf, size_t len, size_t offset ) { return writesome(buf.get() + offset, len); } void cout_t::close() {} void cout_t::flush() { std::cout.flush(); } size_t cerr_t::writesome( const char* buf, size_t len ) { std::cerr.write(buf,len); return len; } + size_t cerr_t::writesome( const std::shared_ptr& buf, size_t len, size_t offset ) { return writesome(buf.get() + offset, len); } void cerr_t::close() {}; void cerr_t::flush() { std::cerr.flush(); } @@ -127,6 +129,7 @@ namespace fc { } return size_t(u); } + size_t cin_t::readsome( const std::shared_ptr& buf, size_t len, size_t offset ) { return readsome(buf.get() + offset, len); } cin_t::~cin_t() { /* @@ -358,12 +361,12 @@ namespace fc { return *this; } - istream& istream::read( const std::shared_ptr& buf, size_t len ) + istream& istream::read( const std::shared_ptr& buf, size_t len, size_t offset ) { - char* pos = buf.get(); - while( size_t(pos-buf.get()) < len ) - pos += readsome( pos, len - (pos - buf.get()) ); - return *this; + size_t bytes_read = 0; + while( bytes_read < len ) + bytes_read += readsome(buf, len - bytes_read, bytes_read + offset); + return *this; } ostream& ostream::write( const char* buf, size_t len ) @@ -374,12 +377,12 @@ namespace fc { return *this; } - ostream& ostream::write( const std::shared_ptr& buf, size_t len ) + ostream& ostream::write( const std::shared_ptr& buf, size_t len, size_t offset ) { - const char* pos = buf.get(); - while( size_t(pos-buf.get()) < len ) - pos += writesome( pos, len - (pos - buf.get()) ); - return *this; + size_t bytes_written = 0; + while( bytes_written < len ) + bytes_written += writesome(buf, len - bytes_written, bytes_written + offset); + return *this; } } // namespace fc diff --git a/src/io/sstream.cpp b/src/io/sstream.cpp index aed1575..9c4739e 100644 --- a/src/io/sstream.cpp +++ b/src/io/sstream.cpp @@ -54,6 +54,11 @@ namespace fc { } return len; } + size_t stringstream::writesome( const std::shared_ptr& buf, size_t len, size_t offset ) + { + return writesome(buf.get() + offset, len); + } + size_t stringstream::readsome( char* buf, size_t len ) { size_t r = static_cast(my->ss.readsome(buf,len)); if( my->ss.eof() || r == 0 ) @@ -62,6 +67,12 @@ namespace fc { } return r; } + size_t stringstream::readsome( const std::shared_ptr& buf, size_t len, size_t offset ) + { + return readsome(buf.get() + offset, len); + } + + void stringstream::close(){ my->ss.flush(); }; void stringstream::flush(){ my->ss.flush(); }; diff --git a/src/network/rate_limiting.cpp b/src/network/rate_limiting.cpp index 2647ca4..1c15af1 100644 --- a/src/network/rate_limiting.cpp +++ b/src/network/rate_limiting.cpp @@ -21,12 +21,15 @@ namespace fc { public: size_t length; + size_t offset; size_t permitted_length; promise::ptr completion_promise; rate_limited_operation(size_t length, + size_t offset, promise::ptr&& completion_promise) : length(length), + offset(offset), permitted_length(0), completion_promise(completion_promise) {} @@ -44,16 +47,20 @@ namespace fc rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length, + size_t offset, promise::ptr completion_promise) : - rate_limited_operation(length, std::move(completion_promise)), + rate_limited_operation(length, offset, std::move(completion_promise)), socket(socket), raw_buffer(buffer) - {} + { + assert(false); + } rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, + size_t offset, promise::ptr completion_promise) : - rate_limited_operation(length, std::move(completion_promise)), + rate_limited_operation(length, offset, std::move(completion_promise)), socket(socket), raw_buffer(nullptr), shared_buffer(buffer) @@ -66,7 +73,7 @@ namespace fc completion_promise); else asio::async_write_some(socket, - shared_buffer, permitted_length, + shared_buffer, permitted_length, offset, completion_promise); } }; @@ -81,16 +88,18 @@ namespace fc rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length, + size_t offset, promise::ptr completion_promise) : - rate_limited_operation(length, std::move(completion_promise)), + rate_limited_operation(length, offset, std::move(completion_promise)), socket(socket), raw_buffer(buffer) {} rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, + size_t offset, promise::ptr completion_promise) : - rate_limited_operation(length, std::move(completion_promise)), + rate_limited_operation(length, offset, std::move(completion_promise)), socket(socket), raw_buffer(nullptr), shared_buffer(buffer) @@ -103,7 +112,7 @@ namespace fc completion_promise); else asio::async_read_some(socket, - shared_buffer, permitted_length, + shared_buffer, permitted_length, offset, completion_promise); } @@ -209,13 +218,13 @@ namespace fc uint32_t burstiness_in_seconds = 1); virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; - virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) override; template - size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length); + size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset); virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; - virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) override; template - size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length); + size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset); void process_pending_reads(); void process_pending_writes(); @@ -240,24 +249,24 @@ namespace fc { } - size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) { - return readsome_impl(socket, buffer, length); + return readsome_impl(socket, buffer, length, offset); } size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) { - return readsome_impl(socket, buffer, length); + return readsome_impl(socket, buffer, length, 0); } template - size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length) + size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset) { size_t bytes_read; if (_download_bytes_per_second) { promise::ptr completion_promise(new promise("rate_limiting_group_impl::readsome")); - rate_limited_tcp_read_operation read_operation(socket, buffer, length, completion_promise); + rate_limited_tcp_read_operation read_operation(socket, buffer, length, offset, completion_promise); _read_operations_for_next_iteration.push_back(&read_operation); // launch the read processing loop it if isn't running, or signal it to resume if it's paused. @@ -279,7 +288,7 @@ namespace fc _unused_read_tokens += read_operation.permitted_length - bytes_read; } else - bytes_read = asio::read_some(socket, buffer, length); + bytes_read = asio::read_some(socket, buffer, length, offset); _actual_download_rate.update(bytes_read); @@ -288,22 +297,22 @@ namespace fc size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) { - return writesome_impl(socket, buffer, length); + return writesome_impl(socket, buffer, length, 0); } - size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) { - return writesome_impl(socket, buffer, length); + return writesome_impl(socket, buffer, length, offset); } template - size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length) + size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset) { size_t bytes_written; if (_upload_bytes_per_second) { promise::ptr completion_promise(new promise("rate_limiting_group_impl::writesome")); - rate_limited_tcp_write_operation write_operation(socket, buffer, length, completion_promise); + rate_limited_tcp_write_operation write_operation(socket, buffer, length, offset, completion_promise); _write_operations_for_next_iteration.push_back(&write_operation); // launch the write processing loop it if isn't running, or signal it to resume if it's paused. @@ -325,7 +334,7 @@ namespace fc _unused_write_tokens += write_operation.permitted_length - bytes_written; } else - bytes_written = asio::write_some(socket, buffer, length); + bytes_written = asio::write_some(socket, buffer, length, offset); _actual_upload_rate.update(bytes_written); diff --git a/src/network/tcp_socket.cpp b/src/network/tcp_socket.cpp index 1d67897..f5a3617 100644 --- a/src/network/tcp_socket.cpp +++ b/src/network/tcp_socket.cpp @@ -41,9 +41,9 @@ namespace fc { } } virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; - virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; + virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) override; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; - virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) override; + virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) override; fc::future _write_in_progress; fc::future _read_in_progress; @@ -55,17 +55,17 @@ namespace fc { { return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait(); } - size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) { - return (_read_in_progress = fc::asio::read_some(socket, buffer, length)).wait(); + return (_read_in_progress = fc::asio::read_some(socket, buffer, length, offset)).wait(); } size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) { return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait(); } - size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length) + size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr& buffer, size_t length, size_t offset) { - return (_write_in_progress = fc::asio::write_some(socket, buffer, length)).wait(); + return (_write_in_progress = fc::asio::write_some(socket, buffer, length, offset)).wait(); } @@ -101,9 +101,9 @@ namespace fc { return my->_io_hooks->writesome(my->_sock, buf, len); } - size_t tcp_socket::writesome(const std::shared_ptr& buf, size_t len) + size_t tcp_socket::writesome(const std::shared_ptr& buf, size_t len, size_t offset) { - return my->_io_hooks->writesome(my->_sock, buf, len); + return my->_io_hooks->writesome(my->_sock, buf, len, offset); } fc::ip::endpoint tcp_socket::remote_endpoint()const @@ -132,8 +132,8 @@ namespace fc { return my->_io_hooks->readsome(my->_sock, buf, len); } - size_t tcp_socket::readsome( const std::shared_ptr& buf, size_t len ) { - return my->_io_hooks->readsome(my->_sock, buf, len); + size_t tcp_socket::readsome( const std::shared_ptr& buf, size_t len, size_t offset ) { + return my->_io_hooks->readsome(my->_sock, buf, len, offset); } void tcp_socket::connect_to( const fc::ip::endpoint& remote_endpoint ) { diff --git a/src/network/udt_socket.cpp b/src/network/udt_socket.cpp index 3ac14a6..655e16c 100644 --- a/src/network/udt_socket.cpp +++ b/src/network/udt_socket.cpp @@ -244,6 +244,11 @@ namespace fc { return bytes_read; } FC_CAPTURE_AND_RETHROW( (max) ) } + size_t udt_socket::readsome( const std::shared_ptr& buf, size_t len, size_t offset ) + { + return readsome(buf.get() + offset, len); + } + bool udt_socket::eof()const { // TODO... @@ -274,6 +279,11 @@ namespace fc { return bytes_sent; } FC_CAPTURE_AND_RETHROW( (len) ) } + size_t udt_socket::writesome( const std::shared_ptr& buf, size_t len, size_t offset ) + { + return writesome(buf.get() + offset, len); + } + void udt_socket::flush(){} void udt_socket::close()