Add counters to the TCP rate limiter to measure actual upload and download speed, and allow the caller to set how bursty they want the connection to be.

This commit is contained in:
Eric Frias 2014-06-25 18:16:58 -04:00
parent 8008334150
commit 2c5c1655a6
3 changed files with 121 additions and 13 deletions

View file

@ -3,6 +3,8 @@
#include <memory>
#include <fc/time.hpp>
namespace fc
{
namespace detail
@ -15,7 +17,7 @@ namespace fc
class rate_limiting_group
{
public:
rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second);
rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds = 1);
~rate_limiting_group();
void set_upload_limit(uint32_t upload_bytes_per_second);
@ -24,6 +26,10 @@ namespace fc
void set_download_limit(uint32_t download_bytes_per_second);
uint32_t get_download_limit() const;
uint32_t get_actual_upload_rate() const;
uint32_t get_actual_download_rate() const;
void set_actual_rate_time_constant(microseconds time_constant);
void add_tcp_socket(tcp_socket* tcp_socket_to_limit);
void remove_tcp_socket(tcp_socket* tcp_socket_to_stop_limiting);
private:

View file

@ -86,11 +86,69 @@ namespace fc
}
};
class average_rate_meter
{
private:
mutable double _average_rate;
mutable uint32_t _unaccounted_bytes;
mutable time_point _last_update_time;
microseconds _time_constant;
void update_const(uint32_t bytes_transferred = 0) const;
public:
average_rate_meter(const microseconds& time_constant = seconds(10));
void set_time_constant(const microseconds& time_constant);
void update(uint32_t bytes_transferred = 0);
uint32_t get_average_rate() const;
};
average_rate_meter::average_rate_meter(const microseconds& time_constant) :
_average_rate(0.),
_unaccounted_bytes(0),
_last_update_time(time_point_sec::min()),
_time_constant(time_constant)
{}
void average_rate_meter::set_time_constant(const microseconds& time_constant)
{
_time_constant = time_constant;
}
void average_rate_meter::update(uint32_t bytes_transferred /* = 0 */)
{
update_const(bytes_transferred);
}
void average_rate_meter::update_const(uint32_t bytes_transferred /* = 0 */) const
{
time_point now = time_point::now();
if (now <= _last_update_time)
_unaccounted_bytes += bytes_transferred;
else
{
microseconds time_since_last_update = now - _last_update_time;
if (time_since_last_update > _time_constant)
_average_rate = bytes_transferred / (_time_constant.count() / (double)seconds(1).count());
else
{
bytes_transferred += _unaccounted_bytes;
double seconds_since_last_update = time_since_last_update.count() / (double)seconds(1).count();
double rate_since_last_update = bytes_transferred / seconds_since_last_update;
double alpha = time_since_last_update.count() / (double)_time_constant.count();
_average_rate = rate_since_last_update * alpha + _average_rate * (1.0 - alpha);
}
_last_update_time = now;
_unaccounted_bytes = 0;
}
}
uint32_t average_rate_meter::get_average_rate() const
{
update_const();
return (uint32_t)_average_rate;
}
class rate_limiting_group_impl : public tcp_socket_io_hooks
{
public:
uint32_t _upload_bytes_per_second;
uint32_t _download_bytes_per_second;
uint32_t _burstiness_in_seconds;
microseconds _granularity; // how often to add tokens to the bucket
uint32_t _read_tokens;
@ -112,7 +170,11 @@ namespace fc
future<void> _process_pending_writes_loop_complete;
promise<void>::ptr _new_write_operation_available_promise;
rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second);
average_rate_meter _actual_upload_rate;
average_rate_meter _actual_download_rate;
rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second,
uint32_t burstiness_in_seconds = 1);
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
@ -127,9 +189,11 @@ namespace fc
uint32_t& unused_tokens);
};
rate_limiting_group_impl::rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second) :
rate_limiting_group_impl::rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second,
uint32_t burstiness_in_seconds) :
_upload_bytes_per_second(upload_bytes_per_second),
_download_bytes_per_second(download_bytes_per_second),
_burstiness_in_seconds(burstiness_in_seconds),
_granularity(milliseconds(50)),
_read_tokens(_download_bytes_per_second),
_unused_read_tokens(0),
@ -140,6 +204,7 @@ namespace fc
size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
{
size_t bytes_read;
if (_download_bytes_per_second)
{
promise<size_t>::ptr completion_promise(new promise<size_t>());
@ -152,15 +217,19 @@ namespace fc
else if (_new_read_operation_available_promise)
_new_read_operation_available_promise->set_value();
size_t bytes_read = completion_promise->wait();
bytes_read = completion_promise->wait();
_unused_read_tokens += read_operation.permitted_length - bytes_read;
return bytes_read;
}
else
return asio::read_some(socket, boost::asio::buffer(buffer, length));
bytes_read = asio::read_some(socket, boost::asio::buffer(buffer, length));
_actual_download_rate.update(bytes_read);
return bytes_read;
}
size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
{
size_t bytes_written;
if (_upload_bytes_per_second)
{
promise<size_t>::ptr completion_promise(new promise<size_t>());
@ -173,12 +242,15 @@ namespace fc
else if (_new_write_operation_available_promise)
_new_write_operation_available_promise->set_value();
size_t bytes_written = completion_promise->wait();
bytes_written = completion_promise->wait();
_unused_write_tokens += write_operation.permitted_length - bytes_written;
return bytes_written;
}
else
return asio::write_some(socket, boost::asio::buffer(buffer, length));
bytes_written = asio::write_some(socket, boost::asio::buffer(buffer, length));
_actual_upload_rate.update(bytes_written);
return bytes_written;
}
void rate_limiting_group_impl::process_pending_reads()
{
@ -248,7 +320,7 @@ namespace fc
tokens += (uint32_t)((limit_bytes_per_second * time_since_last_iteration.count()) / 1000000);
tokens += unused_tokens;
unused_tokens = 0;
tokens = std::min(tokens, limit_bytes_per_second);
tokens = std::min(tokens, limit_bytes_per_second * _burstiness_in_seconds);
if (tokens)
{
@ -303,8 +375,8 @@ namespace fc
}
rate_limiting_group::rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second) :
my(new detail::rate_limiting_group_impl(upload_bytes_per_second, download_bytes_per_second))
rate_limiting_group::rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds /* = 1 */) :
my(new detail::rate_limiting_group_impl(upload_bytes_per_second, download_bytes_per_second, burstiness_in_seconds))
{
}
@ -312,6 +384,22 @@ namespace fc
{
}
uint32_t rate_limiting_group::get_actual_upload_rate() const
{
return my->_actual_upload_rate.get_average_rate();
}
uint32_t rate_limiting_group::get_actual_download_rate() const
{
return my->_actual_download_rate.get_average_rate();
}
void rate_limiting_group::set_actual_rate_time_constant(microseconds time_constant)
{
my->_actual_upload_rate.set_time_constant(time_constant);
my->_actual_download_rate.set_time_constant(time_constant);
}
void rate_limiting_group::set_upload_limit(uint32_t upload_bytes_per_second)
{
my->_upload_bytes_per_second = upload_bytes_per_second;

View file

@ -24,11 +24,25 @@ void download_url(const std::string& ip_address, const std::string& url)
int main( int argc, char** argv )
{
rate_limiter.set_actual_rate_time_constant(fc::seconds(1));
std::vector<fc::future<void> > download_futures;
download_futures.push_back(fc::async([](){ download_url("198.82.184.145", "http://mirror.cs.vt.edu/pub/cygwin/glibc/releases/glibc-2.9.tar.gz"); }));
download_futures.push_back(fc::async([](){ download_url("198.82.184.145", "http://mirror.cs.vt.edu/pub/cygwin/glibc/releases/glibc-2.7.tar.gz"); }));
for (int i = 0; i < download_futures.size(); ++i)
while (1)
{
bool all_done = true;
for (unsigned i = 0; i < download_futures.size(); ++i)
if (!download_futures[i].ready())
all_done = false;
if (all_done)
break;
std::cout << "Current measurement of actual transfer rate: upload " << rate_limiter.get_actual_upload_rate() << ", download " << rate_limiter.get_actual_download_rate() << "\n";
fc::usleep(fc::seconds(1));
}
for (unsigned i = 0; i < download_futures.size(); ++i)
download_futures[i].wait();
return 0;
}