SON conn. pool

This commit is contained in:
timur 2022-11-28 06:38:50 -04:00
parent 94b518cb7a
commit 922327f98c
6 changed files with 136 additions and 34 deletions

View file

@ -1,4 +1,4 @@
#include <graphene/peerplays_sidechain/common/rpc_client.hpp>
#include <graphene/peerplays_sidechain/common/rpc_connection.hpp>
#include <regex>
#include <sstream>
@ -18,7 +18,42 @@
namespace graphene { namespace peerplays_sidechain {
rpc_client::rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls) :
struct rpc_reply {
uint16_t status;
std::string body;
};
class rpc_connection {
public:
rpc_connection(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls);
protected:
std::string retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx);
std::string retrieve_value_from_reply(std::string reply_str, std::string value_path);
std::string send_post_request(std::string method, std::string params, bool show_log);
std::string url;
std::string user;
std::string password;
bool debug_rpc_calls;
std::string protocol;
std::string host;
std::string port;
std::string target;
std::string authorization;
uint32_t request_id;
private:
rpc_reply send_post_request(std::string body, bool show_log);
boost::beast::net::io_context ioc;
boost::beast::net::ip::tcp::resolver resolver;
boost::asio::ip::basic_resolver_results<boost::asio::ip::tcp> results;
};
rpc_connection::rpc_connection(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls) :
url(_url),
user(_user),
password(_password),
@ -61,7 +96,7 @@ rpc_client::rpc_client(std::string _url, std::string _user, std::string _passwor
}
}
std::string rpc_client::retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx) {
std::string rpc_connection::retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx) {
if (reply_str.empty()) {
wlog("RPC call ${function}, empty reply string", ("function", __FUNCTION__));
return "";
@ -98,7 +133,7 @@ std::string rpc_client::retrieve_array_value_from_reply(std::string reply_str, s
return "";
}
std::string rpc_client::retrieve_value_from_reply(std::string reply_str, std::string value_path) {
std::string rpc_connection::retrieve_value_from_reply(std::string reply_str, std::string value_path) {
if (reply_str.empty()) {
wlog("RPC call ${function}, empty reply string", ("function", __FUNCTION__));
return "";
@ -125,7 +160,7 @@ std::string rpc_client::retrieve_value_from_reply(std::string reply_str, std::st
return "";
}
std::string rpc_client::send_post_request(std::string method, std::string params, bool show_log) {
std::string rpc_connection::send_post_request(std::string method, std::string params, bool show_log) {
std::stringstream body;
request_id = request_id + 1;
@ -164,7 +199,7 @@ std::string rpc_client::send_post_request(std::string method, std::string params
return "";
}
rpc_reply rpc_client::send_post_request(std::string body, bool show_log) {
rpc_reply rpc_connection::send_post_request(std::string body, bool show_log) {
// These object is used as a context for ssl connection
boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client);
@ -247,4 +282,71 @@ rpc_reply rpc_client::send_post_request(std::string body, bool show_log) {
return reply;
}
rpc_client::rpc_client(const std::vector<std::string> &_urls, const std::vector<std::string> &_users, const std::vector<std::string> &_passwords, bool _debug_rpc_calls)
{
FC_ASSERT(_urls.size());
FC_ASSERT(_users.size() == _urls.size() && _passwords.size() == _urls.size());
for (size_t i=0; i < _urls.size(); i++)
connections.push_back(new rpc_connection(_urls[i], _users[i], _passwords[i], _debug_rpc_calls));
n_active_conn = 0;
}
void rpc_client::reselect_connection()
{
//ilog("n_active_rpc_client=${n}", ("n", n_active_rpc_client));
FC_ASSERT(connections.size());
int best_n = -1;
int best_quality = -1;
std::vector<uint64_t> head_block_numbers;
head_block_numbers.resize(rpc_clients.size());
for (size_t n=0; n < connections.size(); n++) {
rpc_connection *conn = connections[n];
int quality = 0;
head_block_numbers[n] = std::numeric_limits<uint64_t>::max();
// make the ping
fc::time_point t_sent = fc::time_point::now();
uint64_t head_block_number = ping(*conn);
fc::time_point t_received = fc::time_point::now();
if (head_block_number != std::numeric_limits<uint64_t>::max()) {
int t = (t_received - t_sent).count();
t += rand() % 10;
FC_ASSERT(t != -1);
head_block_numbers[n] = head_block_number;
static const int t_limit = 10*1000000; // 10 sec
if (t < t_limit)
quality = t_limit - t; // the less time, the higher quality
// look for the best quality
if (quality > best_quality) {
best_n = n;
best_quality = quality;
}
}
FC_ASSERT(best_n != -1 && best_quality != -1);
if (best_n != n_active_conn) { // if the best client is not the current one, ...
uint64_t active_head_block_number = head_block_numbers[n_active_conn];
if (active_head_block_number == std::numeric_limits<uint64_t>::max() // ... and the current one has no known head block...
|| head_block_numbers[best_n] >= active_head_block_number) { // ...or the best client's head is more recent than the current, ...
n_active_conn = best_n; // ...then select new one
ilog("!!! rpc connection reselected, now ${n}", ("n", n_active_conn));
}
}
}
}
rpc_connection &rpc_client::get_active_connection() const
{
return *connections[n_active_conn];
}
std::string rpc_client::send_post_request(std::string method, std::string params, bool show_log)
{
return get_active_connection().send_post_request(method, params, show_log);
}
}} // namespace graphene::peerplays_sidechain

View file

@ -8,39 +8,21 @@
namespace graphene { namespace peerplays_sidechain {
struct rpc_reply {
uint16_t status;
std::string body;
};
class rpc_connection;
class rpc_client {
public:
rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls);
rpc_client(const std::vector<std::string> &_urls, const std::vector<std::string> &_users, const std::vector<std::string> &_passwords, bool _debug_rpc_calls);
protected:
std::string retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx);
std::string retrieve_value_from_reply(std::string reply_str, std::string value_path);
std::string send_post_request(std::string method, std::string params, bool show_log);
std::string url;
std::string user;
std::string password;
bool debug_rpc_calls;
std::string protocol;
std::string host;
std::string port;
std::string target;
std::string authorization;
uint32_t request_id;
private:
rpc_reply send_post_request(std::string body, bool show_log);
std::vector<rpc_connection*> connections;
int n_active_conn;
boost::beast::net::io_context ioc;
boost::beast::net::ip::tcp::resolver resolver;
boost::asio::ip::basic_resolver_results<boost::asio::ip::tcp> results;
rpc_connection &get_active_connection() const;
virtual uint64_t ping(rpc_connection &conn) const = 0;
};
}} // namespace graphene::peerplays_sidechain

View file

@ -57,6 +57,7 @@ public:
std::string sendrawtransaction(const std::string &tx_hex);
std::string walletlock();
bool walletpassphrase(const std::string &passphrase, uint32_t timeout = 60);
uint64_t ping(rpc_connection &conn) const;
private:
std::string ip;

View file

@ -30,6 +30,7 @@ public:
std::string get_head_block_time();
std::string get_is_test_net();
std::string get_last_irreversible_block_num();
uint64_t ping(rpc_connection &conn) const;
};
class sidechain_net_handler_hive : public sidechain_net_handler {
@ -54,9 +55,7 @@ private:
std::string rpc_password;
std::string wallet_account_name;
std::vector<hive_rpc_client*> rpc_clients;
int n_active_rpc_client;
hive_rpc_client *rpc_client;
hive::chain_id_type chain_id;
hive::network network_type;
@ -66,7 +65,6 @@ private:
void schedule_hive_listener();
void hive_listener_loop();
void handle_event(const std::string &event_data);
hive_rpc_client *get_active_rpc_client();
};
}} // namespace graphene::peerplays_sidechain

View file

@ -244,6 +244,11 @@ bool bitcoin_rpc_client::walletpassphrase(const std::string &passphrase, uint32_
return true;
}
uint64_t bitcoin_rpc_client::ping(rpc_connection &conn) const
{
return std::numeric_limits<uint64_t>::max();
}
// =============================================================================
zmq_listener::zmq_listener(std::string _ip, uint32_t _zmq) :

View file

@ -112,6 +112,20 @@ std::string hive_rpc_client::get_last_irreversible_block_num() {
return retrieve_value_from_reply(reply_str, "last_irreversible_block_num");
}
uint64_t hive_rpc_client::ping(rpc_connection &conn) const
{
const std::string reply = conn.send_post_request("database_api.get_dynamic_global_properties", "", debug_rpc_calls);
if (!reply.empty()) {
std::stringstream ss(reply);
boost::property_tree::ptree json;
boost::property_tree::read_json(ss, json);
if (json.count("result")) {
return json.get<uint64_t>("result.head_block_number");
}
}
return std::numeric_limits<uint64_t>::max();
}
sidechain_net_handler_hive::sidechain_net_handler_hive(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) :
sidechain_net_handler(_plugin, options) {
sidechain = sidechain_type::hive;