Polling trx from pool

This commit is contained in:
hirunda 2023-06-12 11:33:40 +02:00
parent 8af1c851e6
commit 632f74b000
4 changed files with 74 additions and 18 deletions

View file

@ -56,6 +56,11 @@ libbitcoin_client::libbitcoin_client(std::string url) :
}
libbitcoin_client::~libbitcoin_client() {
if( poller_trx_thr.joinable()) {
stop_poller_trx_thread = true;
poller_transacion_done.notify_all();
poller_trx_thr.join();
}
stop = true;
sub_thr.detach();
}
@ -152,7 +157,9 @@ std::vector<list_unspent_replay> libbitcoin_client::listunspent(std::string addr
bool libbitcoin_client::get_is_test_net() {
bool result = false;
if(is_test_net != -1) {
return is_test_net;
}
auto error_handler = [&](const std::error_code &ec) {
elog("error on fetching genesis block ${error_code}", ("error_code", ec.message()));
@ -161,14 +168,16 @@ bool libbitcoin_client::get_is_test_net() {
auto block_header_handler = [&](const libbitcoin::chain::header &block_header) {
std::string hash_str = libbitcoin::config::hash256(block_header.hash()).to_string();
if (hash_str == GENESIS_TESTNET_HASH || hash_str == GENESIS_REGTEST_HASH) {
result = true;
is_test_net = 1;
} else {
is_test_net = 0;
}
};
obelisk_client.blockchain_fetch_block_header(error_handler, block_header_handler, 0);
obelisk_client.wait();
return result;
return is_test_net;
}
uint64_t libbitcoin_client::get_fee_from_trx(libbitcoin::chain::transaction trx) {
@ -251,8 +260,32 @@ void libbitcoin_client::subscribe_to_address(const std::string address_str, addr
this->address_updated_callback_handler = address_updated_callback_handler;
this->subcription_expired_callback_handler = subcription_expired_callback_handler;
sub_thr = std::thread(&libbitcoin_client::subscription_thr, this);
poller_trx_thr = std::thread(&libbitcoin_client::poller_transaction_thr, this);
}
void libbitcoin_client::poller_transaction_thr() {
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
while(!stop_poller_trx_thread) {
libbitcoin::chain::transaction trx;
if (!target_trxs_pool.empty() && get_transaction(libbitcoin::config::hash256(target_trxs_pool.back()).to_string(), trx)) {
target_trxs_pool.pop_back();
address_updated_callback_handler(trx, get_is_test_net());
}
poller_transacion_done.wait_for(lck, std::chrono::minutes(1));
}
wlog("Exit from poller_transaction_thr() ...");
}
bool libbitcoin_client::is_target_trxs_pool_empty() {
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
return target_trxs_pool.empty();
}
void libbitcoin_client::subscription_thr() {
libbitcoin::wallet::payment_address address(subscription_add);
@ -270,8 +303,11 @@ void libbitcoin_client::subscription_thr() {
if (!error.value()) {
wlog("sequence: ${sequence}, height: ${height}, hash: ${hash}", ("sequence", sequence)("height", height)("hash", libbitcoin::config::hash256(tx_hash).to_string()));
libbitcoin::chain::transaction trx;
if (get_transaction(libbitcoin::config::hash256(tx_hash).to_string(), trx)) {
address_updated_callback_handler(trx);
if (height == 0) {
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
target_trxs_pool.emplace_back(tx_hash);
} else if((get_transaction(libbitcoin::config::hash256(tx_hash).to_string(), trx))){
address_updated_callback_handler(trx, get_is_test_net());
}
}
};
@ -282,6 +318,12 @@ void libbitcoin_client::subscription_thr() {
obelisk_client.monitor(SUBSCRIBE_TIME_DURATION);
if( poller_trx_thr.joinable()) {
stop_poller_trx_thread = true;
poller_transacion_done.notify_all();
poller_trx_thr.join();
}
if (!stop) {
ilog("Subsription monitor expired, renew if needed ...");
subcription_expired_callback_handler(subscription_add);

View file

@ -12,7 +12,7 @@
#define DEAFULT_LIBBITCOIN_TRX_FEE (20000)
#define MAX_TRXS_IN_MEMORY_POOL (30000)
#define MIN_TRXS_IN_BUCKET (100)
#define SUBSCRIBE_TIME_DURATION (2 * 60)
#define SUBSCRIBE_TIME_DURATION (30)
#define GENESIS_MAINNET_HASH "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
#define GENESIS_TESTNET_HASH "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
@ -23,7 +23,7 @@ namespace graphene { namespace peerplays_sidechain {
typedef std::function<void(const libbitcoin::chain::block &)>
block_update_handler;
typedef std::function<void(const libbitcoin::chain::transaction& trx)>
typedef std::function<void(const libbitcoin::chain::transaction& trx, const bool& is_test_net)>
address_update_handler;
typedef std::function<void(const std::string& )>
@ -48,9 +48,11 @@ public:
bool get_is_test_net();
void subscribe_to_address(const std::string address_str, address_update_handler address_updated_callback_handler,
subscription_expired_handler subcription_expired_callback_handler);
bool is_target_trxs_pool_empty();
private:
void subscription_thr();
void poller_transaction_thr();
libbitcoin::client::obelisk_client obelisk_client;
libbitcoin::protocol::zmq::identifier id;
@ -63,9 +65,16 @@ private:
std::string port;
std::string url;
std::vector<libbitcoin::hash_digest> target_trxs_pool;
std::mutex trxs_pool_mutex;
std::condition_variable poller_transacion_done;
bool is_connected = false;
int16_t is_test_net = -1;
std::thread sub_thr;
std::thread poller_trx_thr;
bool stop = false;
bool stop_poller_trx_thread = false;
};
}} // namespace graphene::peerplays_sidechain

View file

@ -255,8 +255,8 @@ private:
void on_changed_objects(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts);
void on_changed_objects_cb(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts);
std::map<std::string, std::unique_ptr<libbitcoin_client>> libbitcoin_clients;
void trx_event(const libbitcoin::chain::transaction &trx);
std::map<std::string, std::pair<std::unique_ptr<libbitcoin_client>, bool>> libbitcoin_clients;
void trx_event(const libbitcoin::chain::transaction &trx, const bool& is_test_net);
bool stop_sub_thr = true;

View file

@ -1668,13 +1668,17 @@ void sidechain_net_handler_bitcoin::subscribe_address_thread() {
std::for_each(sidechain_addresses_by_sidechain_range.first, sidechain_addresses_by_sidechain_range.second,
[&](const sidechain_address_object &sao) {
scoped_lock interlock(event_handler_mutex);
if (!sao.deposit_address.empty() && !libbitcoin_clients[sao.deposit_address]) {
if (!sao.deposit_address.empty() && (!libbitcoin_clients[sao.deposit_address].first || !libbitcoin_clients[sao.deposit_address].second)) {
if (sao.expires > database.head_block_time()) {
libbitcoin_clients[sao.deposit_address] = std::unique_ptr<libbitcoin_client>(new libbitcoin_client(libbitcoin_server_ip));
auto trx_event_callback = std::bind(&sidechain_net_handler_bitcoin::trx_event, this, std::placeholders::_1);
if (!libbitcoin_clients[sao.deposit_address].first) {
libbitcoin_clients[sao.deposit_address] = std::make_pair(std::unique_ptr<libbitcoin_client>(new libbitcoin_client(libbitcoin_server_ip)), true);
}
auto trx_event_callback = std::bind(&sidechain_net_handler_bitcoin::trx_event, this, std::placeholders::_1, std::placeholders::_2);
auto sub_expired_callback = std::bind(&sidechain_net_handler_bitcoin::subscription_expired_event, this, std::placeholders::_1);
libbitcoin_clients[sao.deposit_address]->subscribe_to_address(sao.deposit_address, trx_event_callback, sub_expired_callback);
libbitcoin_clients[sao.deposit_address].first->subscribe_to_address(sao.deposit_address, trx_event_callback, sub_expired_callback);
libbitcoin_clients[sao.deposit_address].second = true;
}
}
});
@ -1687,14 +1691,15 @@ void sidechain_net_handler_bitcoin::subscribe_address_thread() {
void sidechain_net_handler_bitcoin::subscription_expired_event(const std::string &address) {
scoped_lock interlock(event_handler_mutex);
// we just delete the slot for address on which subscription expired in the different
// thread which is triggered on every 10s we will renew the subscription if needed
if (libbitcoin_clients[address]) {
if (libbitcoin_clients[address].first && libbitcoin_clients[address].first->is_target_trxs_pool_empty()) {
libbitcoin_clients.erase(address);
libbitcoin_clients[address].first = nullptr;
} else {
libbitcoin_clients[address].second = false;
}
}
void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transaction& trx) {
void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transaction& trx, const bool& is_test_net) {
scoped_lock interlock(event_handler_mutex);
std::vector<info_for_vin> result;
uint32_t vout_seq = 0;
@ -1702,7 +1707,7 @@ void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transacti
std::vector<std::string> address_list;
libbitcoin::wallet::payment_address::list addresses;
if (/*is_test_net*/true) {
if (is_test_net) {
addresses = o.addresses(libbitcoin::wallet::payment_address::testnet_p2kh,
libbitcoin::wallet::payment_address::testnet_p2sh);
} else {