Start of work towards throttling TCP connections
This commit is contained in:
parent
c63e598497
commit
ce7139c073
5 changed files with 277 additions and 6 deletions
|
|
@ -147,6 +147,7 @@ set( fc_sources
|
|||
src/network/http/http_connection.cpp
|
||||
src/network/http/http_server.cpp
|
||||
src/network/ip.cpp
|
||||
src/network/rate_limiting.cpp
|
||||
src/network/resolve.cpp
|
||||
src/network/url.cpp
|
||||
src/compress/smaz.cpp
|
||||
|
|
|
|||
|
|
@ -79,11 +79,17 @@ namespace asio {
|
|||
* @return the number of bytes read.
|
||||
*/
|
||||
template<typename AsyncReadStream, typename MutableBufferSequence>
|
||||
size_t read_some( AsyncReadStream& s, const MutableBufferSequence& buf )
|
||||
size_t read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
|
||||
{
|
||||
promise<size_t>::ptr p(new promise<size_t>("fc::asio::async_read_some"));
|
||||
s.async_read_some( buf, boost::bind( detail::read_write_handler, p, _1, _2 ) );
|
||||
return p->wait();
|
||||
promise<size_t>::ptr p(new promise<size_t>("fc::asio::async_read_some"));
|
||||
s.async_read_some(buf, boost::bind(detail::read_write_handler, p, _1, _2));
|
||||
return p->wait();
|
||||
}
|
||||
|
||||
template<typename AsyncReadStream, typename MutableBufferSequence>
|
||||
void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise<size_t>::ptr completion_promise)
|
||||
{
|
||||
s.async_read_some(buf, boost::bind(detail::read_write_handler, completion_promise, _1, _2));
|
||||
}
|
||||
|
||||
template<typename AsyncReadStream>
|
||||
|
|
@ -117,6 +123,16 @@ namespace asio {
|
|||
return p->wait();
|
||||
}
|
||||
|
||||
/**
|
||||
* @pre s.non_blocking() == true
|
||||
* @brief wraps boost::asio::async_write_some
|
||||
* @return the number of bytes written
|
||||
*/
|
||||
template<typename AsyncWriteStream, typename ConstBufferSequence>
|
||||
void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise<size_t>::ptr completion_promise) {
|
||||
s.async_write_some(buf, boost::bind(detail::read_write_handler, completion_promise, _1, _2));
|
||||
}
|
||||
|
||||
namespace tcp {
|
||||
typedef boost::asio::ip::tcp::endpoint endpoint;
|
||||
typedef boost::asio::ip::tcp::resolver::iterator resolver_iterator;
|
||||
|
|
|
|||
26
include/fc/network/rate_limiting.hpp
Normal file
26
include/fc/network/rate_limiting.hpp
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
#pragma once
|
||||
#include <fc/utility.hpp>
|
||||
#include <fc/fwd.hpp>
|
||||
#include <fc/io/iostream.hpp>
|
||||
#include <fc/time.hpp>
|
||||
|
||||
namespace fc
|
||||
{
|
||||
namespace detail
|
||||
{
|
||||
class rate_limiting_group_impl;
|
||||
}
|
||||
|
||||
class rate_limiting_group
|
||||
{
|
||||
public:
|
||||
rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second);
|
||||
~rate_limiting_group();
|
||||
|
||||
private:
|
||||
std::unique_ptr<detail::rate_limiting_group_impl> my;
|
||||
};
|
||||
typedef std::shared_ptr<rate_limiting_group> rate_limiting_group_ptr;
|
||||
|
||||
} // namesapce fc
|
||||
|
||||
228
src/network/rate_limiting.cpp
Normal file
228
src/network/rate_limiting.cpp
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
#include <fc/network/rate_limiting.hpp>
|
||||
#include <fc/network/tcp_socket.hpp>
|
||||
#include <fc/network/ip.hpp>
|
||||
#include <fc/fwd_impl.hpp>
|
||||
#include <fc/asio.hpp>
|
||||
#include <fc/log/logger.hpp>
|
||||
#include <fc/io/stdio.hpp>
|
||||
#include <fc/exception/exception.hpp>
|
||||
|
||||
namespace fc
|
||||
{
|
||||
|
||||
namespace detail
|
||||
{
|
||||
// data about a read or write we're managing
|
||||
class rate_limited_operation
|
||||
{
|
||||
public:
|
||||
size_t length;
|
||||
size_t permitted_length;
|
||||
promise<size_t>::ptr completion_promise;
|
||||
|
||||
rate_limited_operation(size_t length,
|
||||
promise<size_t>::ptr&& completion_promise) :
|
||||
length(length),
|
||||
permitted_length(0),
|
||||
completion_promise(completion_promise)
|
||||
{}
|
||||
|
||||
virtual void perform_operation() = 0;
|
||||
};
|
||||
|
||||
class rate_limited_tcp_write_operation : public rate_limited_operation
|
||||
{
|
||||
public:
|
||||
boost::asio::ip::tcp::socket& socket;
|
||||
const char* buffer;
|
||||
|
||||
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
|
||||
const char* buffer,
|
||||
size_t length,
|
||||
promise<size_t>::ptr completion_promise) :
|
||||
rate_limited_operation(length, std::move(completion_promise)),
|
||||
socket(socket),
|
||||
buffer(buffer)
|
||||
{}
|
||||
virtual void perform_operation() override
|
||||
{
|
||||
asio::async_write_some(socket,
|
||||
boost::asio::buffer(buffer, permitted_length),
|
||||
completion_promise);
|
||||
}
|
||||
};
|
||||
class rate_limited_tcp_read_operation : public rate_limited_operation
|
||||
{
|
||||
public:
|
||||
boost::asio::ip::tcp::socket& socket;
|
||||
char* buffer;
|
||||
|
||||
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
|
||||
char* buffer,
|
||||
size_t length,
|
||||
promise<size_t>::ptr completion_promise) :
|
||||
rate_limited_operation(length, std::move(completion_promise)),
|
||||
socket(socket),
|
||||
buffer(buffer)
|
||||
{}
|
||||
virtual void perform_operation() override
|
||||
{
|
||||
asio::async_read_some(socket,
|
||||
boost::asio::buffer(buffer, permitted_length),
|
||||
completion_promise);
|
||||
}
|
||||
};
|
||||
|
||||
struct is_operation_shorter
|
||||
{
|
||||
// less than operator designed to bring the shortest operations to the end
|
||||
bool operator()(const rate_limited_operation* lhs, const rate_limited_operation* rhs)
|
||||
{
|
||||
return lhs->length > rhs->length;
|
||||
}
|
||||
};
|
||||
|
||||
class rate_limiting_group_impl
|
||||
{
|
||||
public:
|
||||
uint32_t _upload_bytes_per_second;
|
||||
uint32_t _download_bytes_per_second;
|
||||
|
||||
microseconds _granularity;
|
||||
|
||||
typedef std::list<std::unique_ptr<rate_limited_operation> > rate_limited_operation_list;
|
||||
rate_limited_operation_list _read_operations_in_progress;
|
||||
rate_limited_operation_list _read_operations_for_next_iteration;
|
||||
rate_limited_operation_list _write_operations_in_progress;
|
||||
rate_limited_operation_list _write_operations_for_next_iteration;
|
||||
|
||||
time_point _last_write_iteration_time;
|
||||
|
||||
rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second);
|
||||
|
||||
size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length);
|
||||
size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buf, size_t len);
|
||||
|
||||
void process_pending_reads();
|
||||
void process_pending_writes();
|
||||
void process_pending_operations(rate_limited_operation_list& operations_in_progress,
|
||||
rate_limited_operation_list& operations_for_next_iteration);
|
||||
};
|
||||
|
||||
rate_limiting_group_impl::rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second) :
|
||||
_upload_bytes_per_second(upload_bytes_per_second),
|
||||
_download_bytes_per_second(download_bytes_per_second),
|
||||
_granularity(fc::milliseconds(50))
|
||||
{
|
||||
}
|
||||
|
||||
size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
|
||||
{
|
||||
promise<size_t>::ptr completion_promise(new promise<size_t>());
|
||||
_read_operations_for_next_iteration.emplace_back(std::make_unique<rate_limited_tcp_read_operation>(socket, buffer, length, completion_promise));
|
||||
return completion_promise->wait();
|
||||
}
|
||||
size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
|
||||
{
|
||||
if (_upload_bytes_per_second)
|
||||
{
|
||||
promise<size_t>::ptr completion_promise(new promise<size_t>());
|
||||
_write_operations_for_next_iteration.emplace_back(std::make_unique<rate_limited_tcp_write_operation>(socket, buffer, length, completion_promise));
|
||||
return completion_promise->wait();
|
||||
}
|
||||
else
|
||||
return asio::write_some(socket, boost::asio::buffer(buffer, length));
|
||||
}
|
||||
void rate_limiting_group_impl::process_pending_reads()
|
||||
{
|
||||
process_pending_operations(_read_operations_in_progress, _read_operations_for_next_iteration);
|
||||
}
|
||||
void rate_limiting_group_impl::process_pending_writes()
|
||||
{
|
||||
process_pending_operations(_write_operations_in_progress, _write_operations_for_next_iteration);
|
||||
}
|
||||
void rate_limiting_group_impl::process_pending_operations(rate_limited_operation_list& operations_in_progress,
|
||||
rate_limited_operation_list& operations_for_next_iteration)
|
||||
{
|
||||
// lock here for multithreaded
|
||||
std::copy(std::make_move_iterator(operations_for_next_iteration.begin()),
|
||||
std::make_move_iterator(operations_for_next_iteration.end()),
|
||||
std::back_inserter(operations_in_progress));
|
||||
operations_for_next_iteration.clear();
|
||||
|
||||
// find out how much time since our last write
|
||||
time_point this_write_iteration_start_time = time_point::now();
|
||||
if (_upload_bytes_per_second) // the we are limiting upload speed
|
||||
{
|
||||
microseconds time_since_last_iteration = this_write_iteration_start_time - _last_write_iteration_time;
|
||||
if (time_since_last_iteration > seconds(1))
|
||||
time_since_last_iteration = seconds(1);
|
||||
else if (time_since_last_iteration < microseconds(0))
|
||||
time_since_last_iteration = microseconds(0);
|
||||
|
||||
uint32_t total_bytes_for_this_iteration =
|
||||
(uint32_t)(time_since_last_iteration.count() / _upload_bytes_per_second / seconds(1).count());
|
||||
if (total_bytes_for_this_iteration)
|
||||
{
|
||||
// sort the pending writes in order of the number of bytes they need to write, smallest first
|
||||
std::vector<rate_limited_operation*> operations_sorted_by_length;
|
||||
operations_sorted_by_length.reserve(operations_in_progress.size());
|
||||
for (std::unique_ptr<rate_limited_operation>& operation_data : operations_in_progress)
|
||||
operations_sorted_by_length.push_back(operation_data.get());
|
||||
std::sort(operations_sorted_by_length.begin(), operations_sorted_by_length.end(), is_operation_shorter());
|
||||
|
||||
// figure out how many bytes each writer is allowed to write
|
||||
uint32_t bytes_remaining_to_allocate = total_bytes_for_this_iteration;
|
||||
while (!operations_sorted_by_length.empty())
|
||||
{
|
||||
uint32_t bytes_permitted_for_this_writer = bytes_remaining_to_allocate / operations_sorted_by_length.size();
|
||||
uint32_t bytes_allocated_for_this_writer = std::min(operations_sorted_by_length.back()->length, bytes_permitted_for_this_writer);
|
||||
operations_sorted_by_length.back()->permitted_length = bytes_allocated_for_this_writer;
|
||||
bytes_remaining_to_allocate -= bytes_allocated_for_this_writer;
|
||||
operations_sorted_by_length.pop_back();
|
||||
}
|
||||
|
||||
// kick off the writes in first-come order
|
||||
for (auto iter = operations_in_progress.begin(); iter != operations_in_progress.end();)
|
||||
{
|
||||
if ((*iter)->permitted_length > 0)
|
||||
{
|
||||
(*iter)->perform_operation();
|
||||
iter = operations_in_progress.erase(iter);
|
||||
}
|
||||
else
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
}
|
||||
else // upload speed is unlimited
|
||||
{
|
||||
// we shouldn't end up here often. If the rate is unlimited, we should just execute
|
||||
// the operation immediately without being queued up. This should only be hit if
|
||||
// we change from a limited rate to unlimited
|
||||
for (auto iter = operations_in_progress.begin();
|
||||
iter != operations_in_progress.end();
|
||||
++iter)
|
||||
{
|
||||
(*iter)->permitted_length = (*iter)->length;
|
||||
(*iter)->perform_operation();
|
||||
}
|
||||
operations_in_progress.clear();
|
||||
}
|
||||
_last_write_iteration_time = this_write_iteration_start_time;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
rate_limiting_group::rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second) :
|
||||
my(new detail::rate_limiting_group_impl(upload_bytes_per_second, download_bytes_per_second))
|
||||
{
|
||||
}
|
||||
|
||||
rate_limiting_group::~rate_limiting_group()
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
|
||||
} // namespace fc
|
||||
|
|
@ -77,8 +77,8 @@ namespace fc {
|
|||
#if defined _WIN32 || defined WIN32 || defined OS_WIN64 || defined _WIN64 || defined WIN64 || defined WINNT
|
||||
struct tcp_keepalive keepalive_settings;
|
||||
keepalive_settings.onoff = 1;
|
||||
keepalive_settings.keepalivetime = interval.count() / fc::milliseconds(1).count();
|
||||
keepalive_settings.keepaliveinterval = interval.count() / fc::milliseconds(1).count();
|
||||
keepalive_settings.keepalivetime = (ULONG)(interval.count() / fc::milliseconds(1).count());
|
||||
keepalive_settings.keepaliveinterval = (ULONG)(interval.count() / fc::milliseconds(1).count());
|
||||
|
||||
DWORD dwBytesRet = 0;
|
||||
if (WSAIoctl(my->_sock.native(), SIO_KEEPALIVE_VALS, &keepalive_settings, sizeof(keepalive_settings),
|
||||
|
|
|
|||
Loading…
Reference in a new issue