Polling trx from pool
This commit is contained in:
parent
8af1c851e6
commit
632f74b000
4 changed files with 74 additions and 18 deletions
|
|
@ -56,6 +56,11 @@ libbitcoin_client::libbitcoin_client(std::string url) :
|
|||
}
|
||||
|
||||
libbitcoin_client::~libbitcoin_client() {
|
||||
if( poller_trx_thr.joinable()) {
|
||||
stop_poller_trx_thread = true;
|
||||
poller_transacion_done.notify_all();
|
||||
poller_trx_thr.join();
|
||||
}
|
||||
stop = true;
|
||||
sub_thr.detach();
|
||||
}
|
||||
|
|
@ -152,7 +157,9 @@ std::vector<list_unspent_replay> libbitcoin_client::listunspent(std::string addr
|
|||
|
||||
bool libbitcoin_client::get_is_test_net() {
|
||||
|
||||
bool result = false;
|
||||
if(is_test_net != -1) {
|
||||
return is_test_net;
|
||||
}
|
||||
|
||||
auto error_handler = [&](const std::error_code &ec) {
|
||||
elog("error on fetching genesis block ${error_code}", ("error_code", ec.message()));
|
||||
|
|
@ -161,14 +168,16 @@ bool libbitcoin_client::get_is_test_net() {
|
|||
auto block_header_handler = [&](const libbitcoin::chain::header &block_header) {
|
||||
std::string hash_str = libbitcoin::config::hash256(block_header.hash()).to_string();
|
||||
if (hash_str == GENESIS_TESTNET_HASH || hash_str == GENESIS_REGTEST_HASH) {
|
||||
result = true;
|
||||
is_test_net = 1;
|
||||
} else {
|
||||
is_test_net = 0;
|
||||
}
|
||||
};
|
||||
|
||||
obelisk_client.blockchain_fetch_block_header(error_handler, block_header_handler, 0);
|
||||
|
||||
obelisk_client.wait();
|
||||
return result;
|
||||
return is_test_net;
|
||||
}
|
||||
|
||||
uint64_t libbitcoin_client::get_fee_from_trx(libbitcoin::chain::transaction trx) {
|
||||
|
|
@ -251,8 +260,32 @@ void libbitcoin_client::subscribe_to_address(const std::string address_str, addr
|
|||
this->address_updated_callback_handler = address_updated_callback_handler;
|
||||
this->subcription_expired_callback_handler = subcription_expired_callback_handler;
|
||||
sub_thr = std::thread(&libbitcoin_client::subscription_thr, this);
|
||||
poller_trx_thr = std::thread(&libbitcoin_client::poller_transaction_thr, this);
|
||||
}
|
||||
|
||||
void libbitcoin_client::poller_transaction_thr() {
|
||||
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
|
||||
|
||||
while(!stop_poller_trx_thread) {
|
||||
|
||||
libbitcoin::chain::transaction trx;
|
||||
if (!target_trxs_pool.empty() && get_transaction(libbitcoin::config::hash256(target_trxs_pool.back()).to_string(), trx)) {
|
||||
target_trxs_pool.pop_back();
|
||||
address_updated_callback_handler(trx, get_is_test_net());
|
||||
}
|
||||
|
||||
poller_transacion_done.wait_for(lck, std::chrono::minutes(1));
|
||||
}
|
||||
|
||||
wlog("Exit from poller_transaction_thr() ...");
|
||||
}
|
||||
|
||||
bool libbitcoin_client::is_target_trxs_pool_empty() {
|
||||
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
|
||||
return target_trxs_pool.empty();
|
||||
}
|
||||
|
||||
|
||||
void libbitcoin_client::subscription_thr() {
|
||||
|
||||
libbitcoin::wallet::payment_address address(subscription_add);
|
||||
|
|
@ -270,8 +303,11 @@ void libbitcoin_client::subscription_thr() {
|
|||
if (!error.value()) {
|
||||
wlog("sequence: ${sequence}, height: ${height}, hash: ${hash}", ("sequence", sequence)("height", height)("hash", libbitcoin::config::hash256(tx_hash).to_string()));
|
||||
libbitcoin::chain::transaction trx;
|
||||
if (get_transaction(libbitcoin::config::hash256(tx_hash).to_string(), trx)) {
|
||||
address_updated_callback_handler(trx);
|
||||
if (height == 0) {
|
||||
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
|
||||
target_trxs_pool.emplace_back(tx_hash);
|
||||
} else if((get_transaction(libbitcoin::config::hash256(tx_hash).to_string(), trx))){
|
||||
address_updated_callback_handler(trx, get_is_test_net());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -282,6 +318,12 @@ void libbitcoin_client::subscription_thr() {
|
|||
|
||||
obelisk_client.monitor(SUBSCRIBE_TIME_DURATION);
|
||||
|
||||
if( poller_trx_thr.joinable()) {
|
||||
stop_poller_trx_thread = true;
|
||||
poller_transacion_done.notify_all();
|
||||
poller_trx_thr.join();
|
||||
}
|
||||
|
||||
if (!stop) {
|
||||
ilog("Subsription monitor expired, renew if needed ...");
|
||||
subcription_expired_callback_handler(subscription_add);
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
#define DEAFULT_LIBBITCOIN_TRX_FEE (20000)
|
||||
#define MAX_TRXS_IN_MEMORY_POOL (30000)
|
||||
#define MIN_TRXS_IN_BUCKET (100)
|
||||
#define SUBSCRIBE_TIME_DURATION (2 * 60)
|
||||
#define SUBSCRIBE_TIME_DURATION (30)
|
||||
|
||||
#define GENESIS_MAINNET_HASH "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
|
||||
#define GENESIS_TESTNET_HASH "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
|
||||
|
|
@ -23,7 +23,7 @@ namespace graphene { namespace peerplays_sidechain {
|
|||
typedef std::function<void(const libbitcoin::chain::block &)>
|
||||
block_update_handler;
|
||||
|
||||
typedef std::function<void(const libbitcoin::chain::transaction& trx)>
|
||||
typedef std::function<void(const libbitcoin::chain::transaction& trx, const bool& is_test_net)>
|
||||
address_update_handler;
|
||||
|
||||
typedef std::function<void(const std::string& )>
|
||||
|
|
@ -48,9 +48,11 @@ public:
|
|||
bool get_is_test_net();
|
||||
void subscribe_to_address(const std::string address_str, address_update_handler address_updated_callback_handler,
|
||||
subscription_expired_handler subcription_expired_callback_handler);
|
||||
bool is_target_trxs_pool_empty();
|
||||
|
||||
private:
|
||||
void subscription_thr();
|
||||
void poller_transaction_thr();
|
||||
|
||||
libbitcoin::client::obelisk_client obelisk_client;
|
||||
libbitcoin::protocol::zmq::identifier id;
|
||||
|
|
@ -63,9 +65,16 @@ private:
|
|||
std::string port;
|
||||
std::string url;
|
||||
|
||||
std::vector<libbitcoin::hash_digest> target_trxs_pool;
|
||||
std::mutex trxs_pool_mutex;
|
||||
std::condition_variable poller_transacion_done;
|
||||
|
||||
bool is_connected = false;
|
||||
int16_t is_test_net = -1;
|
||||
std::thread sub_thr;
|
||||
std::thread poller_trx_thr;
|
||||
bool stop = false;
|
||||
bool stop_poller_trx_thread = false;
|
||||
};
|
||||
|
||||
}} // namespace graphene::peerplays_sidechain
|
||||
|
|
@ -255,8 +255,8 @@ private:
|
|||
void on_changed_objects(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts);
|
||||
void on_changed_objects_cb(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts);
|
||||
|
||||
std::map<std::string, std::unique_ptr<libbitcoin_client>> libbitcoin_clients;
|
||||
void trx_event(const libbitcoin::chain::transaction &trx);
|
||||
std::map<std::string, std::pair<std::unique_ptr<libbitcoin_client>, bool>> libbitcoin_clients;
|
||||
void trx_event(const libbitcoin::chain::transaction &trx, const bool& is_test_net);
|
||||
|
||||
bool stop_sub_thr = true;
|
||||
|
||||
|
|
|
|||
|
|
@ -1668,13 +1668,17 @@ void sidechain_net_handler_bitcoin::subscribe_address_thread() {
|
|||
std::for_each(sidechain_addresses_by_sidechain_range.first, sidechain_addresses_by_sidechain_range.second,
|
||||
[&](const sidechain_address_object &sao) {
|
||||
scoped_lock interlock(event_handler_mutex);
|
||||
if (!sao.deposit_address.empty() && !libbitcoin_clients[sao.deposit_address]) {
|
||||
if (!sao.deposit_address.empty() && (!libbitcoin_clients[sao.deposit_address].first || !libbitcoin_clients[sao.deposit_address].second)) {
|
||||
|
||||
if (sao.expires > database.head_block_time()) {
|
||||
libbitcoin_clients[sao.deposit_address] = std::unique_ptr<libbitcoin_client>(new libbitcoin_client(libbitcoin_server_ip));
|
||||
auto trx_event_callback = std::bind(&sidechain_net_handler_bitcoin::trx_event, this, std::placeholders::_1);
|
||||
if (!libbitcoin_clients[sao.deposit_address].first) {
|
||||
libbitcoin_clients[sao.deposit_address] = std::make_pair(std::unique_ptr<libbitcoin_client>(new libbitcoin_client(libbitcoin_server_ip)), true);
|
||||
}
|
||||
|
||||
auto trx_event_callback = std::bind(&sidechain_net_handler_bitcoin::trx_event, this, std::placeholders::_1, std::placeholders::_2);
|
||||
auto sub_expired_callback = std::bind(&sidechain_net_handler_bitcoin::subscription_expired_event, this, std::placeholders::_1);
|
||||
libbitcoin_clients[sao.deposit_address]->subscribe_to_address(sao.deposit_address, trx_event_callback, sub_expired_callback);
|
||||
libbitcoin_clients[sao.deposit_address].first->subscribe_to_address(sao.deposit_address, trx_event_callback, sub_expired_callback);
|
||||
libbitcoin_clients[sao.deposit_address].second = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
@ -1687,14 +1691,15 @@ void sidechain_net_handler_bitcoin::subscribe_address_thread() {
|
|||
|
||||
void sidechain_net_handler_bitcoin::subscription_expired_event(const std::string &address) {
|
||||
scoped_lock interlock(event_handler_mutex);
|
||||
// we just delete the slot for address on which subscription expired in the different
|
||||
// thread which is triggered on every 10s we will renew the subscription if needed
|
||||
if (libbitcoin_clients[address]) {
|
||||
if (libbitcoin_clients[address].first && libbitcoin_clients[address].first->is_target_trxs_pool_empty()) {
|
||||
libbitcoin_clients.erase(address);
|
||||
libbitcoin_clients[address].first = nullptr;
|
||||
} else {
|
||||
libbitcoin_clients[address].second = false;
|
||||
}
|
||||
}
|
||||
|
||||
void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transaction& trx) {
|
||||
void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transaction& trx, const bool& is_test_net) {
|
||||
scoped_lock interlock(event_handler_mutex);
|
||||
std::vector<info_for_vin> result;
|
||||
uint32_t vout_seq = 0;
|
||||
|
|
@ -1702,7 +1707,7 @@ void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transacti
|
|||
std::vector<std::string> address_list;
|
||||
|
||||
libbitcoin::wallet::payment_address::list addresses;
|
||||
if (/*is_test_net*/true) {
|
||||
if (is_test_net) {
|
||||
addresses = o.addresses(libbitcoin::wallet::payment_address::testnet_p2kh,
|
||||
libbitcoin::wallet::payment_address::testnet_p2sh);
|
||||
} else {
|
||||
|
|
|
|||
Loading…
Reference in a new issue