From 62b479568e2dcad49cfa4e7149c230b49a1829c0 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Thu, 8 May 2014 14:55:51 -0400 Subject: [PATCH] Get rate limiting mostly working --- src/network/rate_limiting.cpp | 111 +++++++++++++++++++++++++--------- tests/rate_limiting.cpp | 18 ++++-- 2 files changed, 98 insertions(+), 31 deletions(-) diff --git a/src/network/rate_limiting.cpp b/src/network/rate_limiting.cpp index f41eaa0..0055abb 100644 --- a/src/network/rate_limiting.cpp +++ b/src/network/rate_limiting.cpp @@ -90,9 +90,13 @@ namespace fc uint32_t _upload_bytes_per_second; uint32_t _download_bytes_per_second; - microseconds _granularity; + microseconds _granularity; // how often to add tokens to the bucket + uint32_t _read_tokens; + uint32_t _unused_read_tokens; // gets filled with tokens for unused bytes (if I'm allowed to read 200 bytes and I try to read 200 bytes, but can only read 50, tokens for the other 150 get returned here) + uint32_t _write_tokens; + uint32_t _unused_write_tokens; - typedef std::list > rate_limited_operation_list; + typedef std::list rate_limited_operation_list; rate_limited_operation_list _read_operations_in_progress; rate_limited_operation_list _read_operations_for_next_iteration; rate_limited_operation_list _write_operations_in_progress; @@ -101,8 +105,10 @@ namespace fc time_point _last_read_iteration_time; time_point _last_write_iteration_time; - fc::future _process_pending_reads_loop_complete; - fc::future _process_pending_writes_loop_complete; + future _process_pending_reads_loop_complete; + promise::ptr _new_read_operation_available_promise; + future _process_pending_writes_loop_complete; + promise::ptr _new_write_operation_available_promise; rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second); @@ -114,13 +120,19 @@ namespace fc void process_pending_operations(time_point& last_iteration_start_time, uint32_t& limit_bytes_per_second, rate_limited_operation_list& operations_in_progress, - rate_limited_operation_list& operations_for_next_iteration); + rate_limited_operation_list& operations_for_next_iteration, + uint32_t& tokens, + uint32_t& unused_tokens); }; rate_limiting_group_impl::rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second) : _upload_bytes_per_second(upload_bytes_per_second), _download_bytes_per_second(download_bytes_per_second), - _granularity(fc::milliseconds(50)) + _granularity(milliseconds(50)), + _read_tokens(_download_bytes_per_second), + _unused_read_tokens(0), + _write_tokens(_upload_bytes_per_second), + _unused_write_tokens(0) { } @@ -129,10 +141,18 @@ namespace fc if (_download_bytes_per_second) { promise::ptr completion_promise(new promise()); - _read_operations_for_next_iteration.emplace_back(std::make_unique(socket, buffer, length, completion_promise)); - if (!_process_pending_reads_loop_complete.valid()) + rate_limited_tcp_read_operation read_operation(socket, buffer, length, completion_promise); + _read_operations_for_next_iteration.push_back(&read_operation); + + // launch the read processing loop it if isn't running, or signal it to resume if it's paused. + if (!_process_pending_reads_loop_complete.valid() || _process_pending_reads_loop_complete.ready()) _process_pending_reads_loop_complete = async([=](){ process_pending_reads(); }); - return completion_promise->wait(); + else if (_new_read_operation_available_promise) + _new_read_operation_available_promise->set_value(); + + size_t bytes_read = completion_promise->wait(); + _unused_read_tokens += read_operation.permitted_length - bytes_read; + return bytes_read; } else return asio::read_some(socket, boost::asio::buffer(buffer, length)); @@ -142,10 +162,18 @@ namespace fc if (_upload_bytes_per_second) { promise::ptr completion_promise(new promise()); - _write_operations_for_next_iteration.emplace_back(std::make_unique(socket, buffer, length, completion_promise)); - if (!_process_pending_writes_loop_complete.valid()) + rate_limited_tcp_write_operation write_operation(socket, buffer, length, completion_promise); + _write_operations_for_next_iteration.push_back(&write_operation); + + // launch the write processing loop it if isn't running, or signal it to resume if it's paused. + if (!_process_pending_writes_loop_complete.valid() || _process_pending_writes_loop_complete.ready()) _process_pending_writes_loop_complete = async([=](){ process_pending_writes(); }); - return completion_promise->wait(); + else if (_new_write_operation_available_promise) + _new_write_operation_available_promise->set_value(); + + size_t bytes_written = completion_promise->wait(); + _unused_write_tokens += write_operation.permitted_length - bytes_written; + return bytes_written; } else return asio::write_some(socket, boost::asio::buffer(buffer, length)); @@ -155,8 +183,20 @@ namespace fc for (;;) { process_pending_operations(_last_read_iteration_time, _download_bytes_per_second, - _read_operations_in_progress, _read_operations_for_next_iteration); - fc::usleep(_granularity); + _read_operations_in_progress, _read_operations_for_next_iteration, _read_tokens, _unused_read_tokens); + + _new_read_operation_available_promise = new promise(); + try + { + if (_read_operations_in_progress.empty()) + _new_read_operation_available_promise->wait(); + else + _new_read_operation_available_promise->wait(_granularity); + } + catch (const timeout_exception&) + { + } + _new_read_operation_available_promise.reset(); } } void rate_limiting_group_impl::process_pending_writes() @@ -164,18 +204,32 @@ namespace fc for (;;) { process_pending_operations(_last_write_iteration_time, _upload_bytes_per_second, - _write_operations_in_progress, _write_operations_for_next_iteration); - fc::usleep(_granularity); + _write_operations_in_progress, _write_operations_for_next_iteration, _write_tokens, _unused_write_tokens); + + _new_write_operation_available_promise = new promise(); + try + { + if (_write_operations_in_progress.empty()) + _new_write_operation_available_promise->wait(); + else + _new_write_operation_available_promise->wait(_granularity); + } + catch (const timeout_exception&) + { + } + _new_write_operation_available_promise.reset(); } } void rate_limiting_group_impl::process_pending_operations(time_point& last_iteration_start_time, uint32_t& limit_bytes_per_second, rate_limited_operation_list& operations_in_progress, - rate_limited_operation_list& operations_for_next_iteration) + rate_limited_operation_list& operations_for_next_iteration, + uint32_t& tokens, + uint32_t& unused_tokens) { // lock here for multithreaded - std::copy(std::make_move_iterator(operations_for_next_iteration.begin()), - std::make_move_iterator(operations_for_next_iteration.end()), + std::copy(operations_for_next_iteration.begin(), + operations_for_next_iteration.end(), std::back_inserter(operations_in_progress)); operations_for_next_iteration.clear(); @@ -188,21 +242,23 @@ namespace fc time_since_last_iteration = seconds(1); else if (time_since_last_iteration < microseconds(0)) time_since_last_iteration = microseconds(0); + + tokens += (uint32_t)((limit_bytes_per_second * time_since_last_iteration.count()) / 1000000); + tokens += unused_tokens; + unused_tokens = 0; + tokens = std::min(tokens, limit_bytes_per_second); - uint32_t total_bytes_for_this_iteration = time_since_last_iteration.count() ? - (uint32_t)((1000000 * limit_bytes_per_second) / time_since_last_iteration.count()) : - 0; - if (total_bytes_for_this_iteration) + if (tokens) { // sort the pending reads/writes in order of the number of bytes they need to write, smallest first std::vector operations_sorted_by_length; operations_sorted_by_length.reserve(operations_in_progress.size()); - for (std::unique_ptr& operation_data : operations_in_progress) - operations_sorted_by_length.push_back(operation_data.get()); + for (rate_limited_operation* operation_data : operations_in_progress) + operations_sorted_by_length.push_back(operation_data); std::sort(operations_sorted_by_length.begin(), operations_sorted_by_length.end(), is_operation_shorter()); // figure out how many bytes each reader/writer is allowed to read/write - uint32_t bytes_remaining_to_allocate = total_bytes_for_this_iteration; + uint32_t bytes_remaining_to_allocate = tokens; while (!operations_sorted_by_length.empty()) { uint32_t bytes_permitted_for_this_operation = bytes_remaining_to_allocate / operations_sorted_by_length.size(); @@ -211,6 +267,7 @@ namespace fc bytes_remaining_to_allocate -= bytes_allocated_for_this_operation; operations_sorted_by_length.pop_back(); } + tokens = bytes_remaining_to_allocate; // kick off the reads/writes in first-come order for (auto iter = operations_in_progress.begin(); iter != operations_in_progress.end();) @@ -225,7 +282,7 @@ namespace fc } } } - else // upload speed is unlimited + else // down/upload speed is unlimited { // we shouldn't end up here often. If the rate is unlimited, we should just execute // the operation immediately without being queued up. This should only be hit if diff --git a/tests/rate_limiting.cpp b/tests/rate_limiting.cpp index 1ca31f3..dd454a1 100644 --- a/tests/rate_limiting.cpp +++ b/tests/rate_limiting.cpp @@ -5,20 +5,30 @@ #include #include +fc::rate_limiting_group rate_limiter(1000000, 1000000); -int main( int argc, char** argv ) +void download_url(const std::string& ip_address, const std::string& url) { - fc::rate_limiting_group rate_limiter(1000000,1000000); fc::http::connection http_connection; rate_limiter.add_tcp_socket(&http_connection.get_socket()); - http_connection.connect_to(fc::ip::endpoint(fc::ip::address("162.243.115.24"),80)); + http_connection.connect_to(fc::ip::endpoint(fc::ip::address(ip_address.c_str()), 80)); std::cout << "Starting download...\n"; fc::time_point start_time(fc::time_point::now()); - fc::http::reply reply = http_connection.request("GET", "http://invictus.io/bin/Keyhotee-0.7.0.dmg"); + fc::http::reply reply = http_connection.request("GET", "http://mirror.cs.vt.edu/pub/cygwin/glibc/releases/glibc-2.9.tar.gz"); fc::time_point end_time(fc::time_point::now()); std::cout << "HTTP return code: " << reply.status << "\n"; std::cout << "Retreived " << reply.body.size() << " bytes in " << ((end_time - start_time).count() / fc::milliseconds(1).count()) << "ms\n"; std::cout << "Average speed " << ((1000 * (uint64_t)reply.body.size()) / ((end_time - start_time).count() / fc::milliseconds(1).count())) << " bytes per second"; +} + +int main( int argc, char** argv ) +{ + std::vector > download_futures; + download_futures.push_back(fc::async([](){ download_url("198.82.184.145", "http://mirror.cs.vt.edu/pub/cygwin/glibc/releases/glibc-2.9.tar.gz"); })); + download_futures.push_back(fc::async([](){ download_url("198.82.184.145", "http://mirror.cs.vt.edu/pub/cygwin/glibc/releases/glibc-2.7.tar.gz"); })); + + for (int i = 0; i < download_futures.size(); ++i) + download_futures[i].wait(); return 0; }