diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f82b21..281e6f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -226,7 +226,7 @@ target_link_libraries( fc PUBLIC easylzma_static ${Boost_LIBRARIES} ${OPENSSL_LI add_executable( ntp_test ntp_test.cpp ) target_link_libraries( ntp_test fc ) -include_directories( vendor/udt4/src ) +#include_directories( vendor/udt4/src ) #add_executable( udt_server tests/udt_server.cpp ) #target_link_libraries( udt_server fc udt ) diff --git a/include/fc/network/rate_limiting.hpp b/include/fc/network/rate_limiting.hpp index 7694789..4b798d5 100644 --- a/include/fc/network/rate_limiting.hpp +++ b/include/fc/network/rate_limiting.hpp @@ -3,6 +3,8 @@ #include +#include + namespace fc { namespace detail @@ -15,7 +17,7 @@ namespace fc class rate_limiting_group { public: - rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second); + rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds = 1); ~rate_limiting_group(); void set_upload_limit(uint32_t upload_bytes_per_second); @@ -24,6 +26,10 @@ namespace fc void set_download_limit(uint32_t download_bytes_per_second); uint32_t get_download_limit() const; + uint32_t get_actual_upload_rate() const; + uint32_t get_actual_download_rate() const; + void set_actual_rate_time_constant(microseconds time_constant); + void add_tcp_socket(tcp_socket* tcp_socket_to_limit); void remove_tcp_socket(tcp_socket* tcp_socket_to_stop_limiting); private: diff --git a/src/network/rate_limiting.cpp b/src/network/rate_limiting.cpp index 59db1a5..fe69277 100644 --- a/src/network/rate_limiting.cpp +++ b/src/network/rate_limiting.cpp @@ -86,11 +86,69 @@ namespace fc } }; + class average_rate_meter + { + private: + mutable double _average_rate; + mutable uint32_t _unaccounted_bytes; + mutable time_point _last_update_time; + microseconds _time_constant; + + void update_const(uint32_t bytes_transferred = 0) const; + public: + average_rate_meter(const microseconds& time_constant = seconds(10)); + void set_time_constant(const microseconds& time_constant); + void update(uint32_t bytes_transferred = 0); + uint32_t get_average_rate() const; + }; + average_rate_meter::average_rate_meter(const microseconds& time_constant) : + _average_rate(0.), + _unaccounted_bytes(0), + _last_update_time(time_point_sec::min()), + _time_constant(time_constant) + {} + void average_rate_meter::set_time_constant(const microseconds& time_constant) + { + _time_constant = time_constant; + } + void average_rate_meter::update(uint32_t bytes_transferred /* = 0 */) + { + update_const(bytes_transferred); + } + void average_rate_meter::update_const(uint32_t bytes_transferred /* = 0 */) const + { + time_point now = time_point::now(); + if (now <= _last_update_time) + _unaccounted_bytes += bytes_transferred; + else + { + microseconds time_since_last_update = now - _last_update_time; + if (time_since_last_update > _time_constant) + _average_rate = bytes_transferred / (_time_constant.count() / (double)seconds(1).count()); + else + { + bytes_transferred += _unaccounted_bytes; + double seconds_since_last_update = time_since_last_update.count() / (double)seconds(1).count(); + double rate_since_last_update = bytes_transferred / seconds_since_last_update; + double alpha = time_since_last_update.count() / (double)_time_constant.count(); + _average_rate = rate_since_last_update * alpha + _average_rate * (1.0 - alpha); + } + _last_update_time = now; + _unaccounted_bytes = 0; + } + } + uint32_t average_rate_meter::get_average_rate() const + { + update_const(); + return (uint32_t)_average_rate; + } + class rate_limiting_group_impl : public tcp_socket_io_hooks { public: uint32_t _upload_bytes_per_second; uint32_t _download_bytes_per_second; + uint32_t _burstiness_in_seconds; microseconds _granularity; // how often to add tokens to the bucket uint32_t _read_tokens; @@ -112,7 +170,11 @@ namespace fc future _process_pending_writes_loop_complete; promise::ptr _new_write_operation_available_promise; - rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second); + average_rate_meter _actual_upload_rate; + average_rate_meter _actual_download_rate; + + rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, + uint32_t burstiness_in_seconds = 1); virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; @@ -127,9 +189,11 @@ namespace fc uint32_t& unused_tokens); }; - rate_limiting_group_impl::rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second) : + rate_limiting_group_impl::rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, + uint32_t burstiness_in_seconds) : _upload_bytes_per_second(upload_bytes_per_second), _download_bytes_per_second(download_bytes_per_second), + _burstiness_in_seconds(burstiness_in_seconds), _granularity(milliseconds(50)), _read_tokens(_download_bytes_per_second), _unused_read_tokens(0), @@ -140,6 +204,7 @@ namespace fc size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) { + size_t bytes_read; if (_download_bytes_per_second) { promise::ptr completion_promise(new promise()); @@ -152,15 +217,19 @@ namespace fc else if (_new_read_operation_available_promise) _new_read_operation_available_promise->set_value(); - size_t bytes_read = completion_promise->wait(); + bytes_read = completion_promise->wait(); _unused_read_tokens += read_operation.permitted_length - bytes_read; - return bytes_read; } else - return asio::read_some(socket, boost::asio::buffer(buffer, length)); + bytes_read = asio::read_some(socket, boost::asio::buffer(buffer, length)); + + _actual_download_rate.update(bytes_read); + + return bytes_read; } size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) { + size_t bytes_written; if (_upload_bytes_per_second) { promise::ptr completion_promise(new promise()); @@ -173,12 +242,15 @@ namespace fc else if (_new_write_operation_available_promise) _new_write_operation_available_promise->set_value(); - size_t bytes_written = completion_promise->wait(); + bytes_written = completion_promise->wait(); _unused_write_tokens += write_operation.permitted_length - bytes_written; - return bytes_written; } else - return asio::write_some(socket, boost::asio::buffer(buffer, length)); + bytes_written = asio::write_some(socket, boost::asio::buffer(buffer, length)); + + _actual_upload_rate.update(bytes_written); + + return bytes_written; } void rate_limiting_group_impl::process_pending_reads() { @@ -248,7 +320,7 @@ namespace fc tokens += (uint32_t)((limit_bytes_per_second * time_since_last_iteration.count()) / 1000000); tokens += unused_tokens; unused_tokens = 0; - tokens = std::min(tokens, limit_bytes_per_second); + tokens = std::min(tokens, limit_bytes_per_second * _burstiness_in_seconds); if (tokens) { @@ -303,8 +375,8 @@ namespace fc } - rate_limiting_group::rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second) : - my(new detail::rate_limiting_group_impl(upload_bytes_per_second, download_bytes_per_second)) + rate_limiting_group::rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds /* = 1 */) : + my(new detail::rate_limiting_group_impl(upload_bytes_per_second, download_bytes_per_second, burstiness_in_seconds)) { } @@ -312,6 +384,22 @@ namespace fc { } + uint32_t rate_limiting_group::get_actual_upload_rate() const + { + return my->_actual_upload_rate.get_average_rate(); + } + + uint32_t rate_limiting_group::get_actual_download_rate() const + { + return my->_actual_download_rate.get_average_rate(); + } + + void rate_limiting_group::set_actual_rate_time_constant(microseconds time_constant) + { + my->_actual_upload_rate.set_time_constant(time_constant); + my->_actual_download_rate.set_time_constant(time_constant); + } + void rate_limiting_group::set_upload_limit(uint32_t upload_bytes_per_second) { my->_upload_bytes_per_second = upload_bytes_per_second; diff --git a/tests/rate_limiting.cpp b/tests/rate_limiting.cpp index dd454a1..e33b2ff 100644 --- a/tests/rate_limiting.cpp +++ b/tests/rate_limiting.cpp @@ -24,11 +24,25 @@ void download_url(const std::string& ip_address, const std::string& url) int main( int argc, char** argv ) { + rate_limiter.set_actual_rate_time_constant(fc::seconds(1)); + std::vector > download_futures; download_futures.push_back(fc::async([](){ download_url("198.82.184.145", "http://mirror.cs.vt.edu/pub/cygwin/glibc/releases/glibc-2.9.tar.gz"); })); download_futures.push_back(fc::async([](){ download_url("198.82.184.145", "http://mirror.cs.vt.edu/pub/cygwin/glibc/releases/glibc-2.7.tar.gz"); })); - for (int i = 0; i < download_futures.size(); ++i) + while (1) + { + bool all_done = true; + for (unsigned i = 0; i < download_futures.size(); ++i) + if (!download_futures[i].ready()) + all_done = false; + if (all_done) + break; + std::cout << "Current measurement of actual transfer rate: upload " << rate_limiter.get_actual_upload_rate() << ", download " << rate_limiter.get_actual_download_rate() << "\n"; + fc::usleep(fc::seconds(1)); + } + + for (unsigned i = 0; i < download_futures.size(); ++i) download_futures[i].wait(); return 0; } diff --git a/vendor/udt4/src/common.h b/vendor/udt4/src/common.h index 3782d61..20c0bb4 100644 --- a/vendor/udt4/src/common.h +++ b/vendor/udt4/src/common.h @@ -50,6 +50,7 @@ written by #include #endif #include +#include #include "udt.h" diff --git a/vendor/udt4/src/udt.h b/vendor/udt4/src/udt.h index 6436363..7dc75a3 100644 --- a/vendor/udt4/src/udt.h +++ b/vendor/udt4/src/udt.h @@ -58,6 +58,8 @@ written by #include #include +#include + //////////////////////////////////////////////////////////////////////////////// @@ -69,7 +71,7 @@ written by #ifdef WIN32 - #ifndef __MINGW__ + #if !defined(__MINGW__) && defined(UDT_IS_DLL) // Explicitly define 32-bit and 64-bit numbers typedef __int32 int32_t; typedef __int64 int64_t;