From 8af1c851e6e3af1f47175e38303fd81ed5c317d5 Mon Sep 17 00:00:00 2001 From: hirunda Date: Wed, 31 May 2023 21:43:13 +0200 Subject: [PATCH] Libbitcoin subscribe to address - Exclude using libbitcoin block event - Use SON address for subscribing on the libbitcoin server - Use the event from libbitcoin server if something is related to subscribed address --- .../bitcoin/libbitcoin_client.cpp | 84 +++++++++++-- .../bitcoin/libbitcoin_client.hpp | 20 +++- .../sidechain_net_handler_bitcoin.hpp | 12 ++ .../sidechain_net_handler_bitcoin.cpp | 110 +++++++++++++++--- 4 files changed, 198 insertions(+), 28 deletions(-) diff --git a/libraries/plugins/peerplays_sidechain/bitcoin/libbitcoin_client.cpp b/libraries/plugins/peerplays_sidechain/bitcoin/libbitcoin_client.cpp index fdcaa98e..e4d897f2 100644 --- a/libraries/plugins/peerplays_sidechain/bitcoin/libbitcoin_client.cpp +++ b/libraries/plugins/peerplays_sidechain/bitcoin/libbitcoin_client.cpp @@ -5,6 +5,7 @@ #include #include +#include #include @@ -54,6 +55,11 @@ libbitcoin_client::libbitcoin_client(std::string url) : is_connected = true; } +libbitcoin_client::~libbitcoin_client() { + stop = true; + sub_thr.detach(); +} + std::string libbitcoin_client::send_transaction(std::string tx) { std::string res; @@ -78,28 +84,41 @@ std::string libbitcoin_client::send_transaction(std::string tx) { return res; } -libbitcoin::chain::output::list libbitcoin_client::get_transaction(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions) { - - libbitcoin::chain::output::list outs; - +bool libbitcoin_client::get_transaction(const std::string tx, libbitcoin::chain::transaction& trx) { + bool result = false; auto error_handler = [&](const std::error_code &ec) { - elog("error on fetch_trx_by_hash: ${hash} ${error_code}", ("hash", tx_id)("error_code", ec.message())); + elog("error on fetch_trx_by_hash: ${hash} ${error_code}", ("hash", tx)("error_code", ec.message())); + result = false; }; auto transaction_handler = [&](const libbitcoin::chain::transaction &tx_handler) { - tx_hash = libbitcoin::config::hash256(tx_handler.hash(false)).to_string(); - // TODO try to find this value (confirmitions) - confirmitions = 1; - outs = tx_handler.outputs(); + trx = tx_handler; + result = true; }; - libbitcoin::hash_digest hash = libbitcoin::config::hash256(tx_id); + libbitcoin::hash_digest hash = libbitcoin::config::hash256(tx); // obelisk_client.blockchain_fetch_transaction (error_handler, transaction_handler,hash); obelisk_client.blockchain_fetch_transaction2(error_handler, transaction_handler, hash); obelisk_client.wait(); + return result; +} + +libbitcoin::chain::output::list libbitcoin_client::get_transaction_outs(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions) { + + libbitcoin::chain::output::list outs; + libbitcoin::chain::transaction trx; + + if (get_transaction(tx_id, trx)) { + + tx_hash = libbitcoin::config::hash256(trx.hash(false)).to_string(); + // TODO try to find this value (confirmitions) + confirmitions = 1; + outs = trx.outputs(); + } + return outs; } @@ -224,4 +243,49 @@ uint64_t libbitcoin_client::get_average_fee_from_trxs(std::vectoraddress_updated_callback_handler = address_updated_callback_handler; + this->subcription_expired_callback_handler = subcription_expired_callback_handler; + sub_thr = std::thread(&libbitcoin_client::subscription_thr, this); +} + +void libbitcoin_client::subscription_thr() { + + libbitcoin::wallet::payment_address address(subscription_add); + + auto on_subscribed = [&](const std::error_code &error) { + ilog("On subscribed ${error}", ("error", error.message())); + }; + + auto on_error = [&](const std::error_code &error) { + elog("On subscribed there is an error: ${error}", ("error", error.message())); + }; + + auto on_update = [&](const std::error_code &error, uint16_t sequence, size_t height, const libbitcoin::hash_digest &tx_hash) { + wlog("On update value error: ${error}", ("error", error.value())); + if (!error.value()) { + wlog("sequence: ${sequence}, height: ${height}, hash: ${hash}", ("sequence", sequence)("height", height)("hash", libbitcoin::config::hash256(tx_hash).to_string())); + libbitcoin::chain::transaction trx; + if (get_transaction(libbitcoin::config::hash256(tx_hash).to_string(), trx)) { + address_updated_callback_handler(trx); + } + } + }; + + obelisk_client.set_on_update(on_update); + obelisk_client.subscribe_address(on_error, on_subscribed, address); + obelisk_client.wait(); + + obelisk_client.monitor(SUBSCRIBE_TIME_DURATION); + + if (!stop) { + ilog("Subsription monitor expired, renew if needed ..."); + subcription_expired_callback_handler(subscription_add); + } +} + }} // namespace graphene::peerplays_sidechain \ No newline at end of file diff --git a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/bitcoin/libbitcoin_client.hpp b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/bitcoin/libbitcoin_client.hpp index 4426983a..665af1d5 100644 --- a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/bitcoin/libbitcoin_client.hpp +++ b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/bitcoin/libbitcoin_client.hpp @@ -12,6 +12,7 @@ #define DEAFULT_LIBBITCOIN_TRX_FEE (20000) #define MAX_TRXS_IN_MEMORY_POOL (30000) #define MIN_TRXS_IN_BUCKET (100) +#define SUBSCRIBE_TIME_DURATION (2 * 60) #define GENESIS_MAINNET_HASH "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" #define GENESIS_TESTNET_HASH "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943" @@ -22,6 +23,12 @@ namespace graphene { namespace peerplays_sidechain { typedef std::function block_update_handler; +typedef std::function + address_update_handler; + +typedef std::function + subscription_expired_handler; + struct list_unspent_replay { std::string hash; uint64_t value; @@ -31,16 +38,25 @@ struct list_unspent_replay { class libbitcoin_client { public: libbitcoin_client(std::string url); + ~libbitcoin_client(); std::string send_transaction(const std::string tx); - libbitcoin::chain::output::list get_transaction(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions); + bool get_transaction(const std::string tx, libbitcoin::chain::transaction& trx); + libbitcoin::chain::output::list get_transaction_outs(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions); std::vector listunspent(std::string address, double amount); uint64_t get_average_fee_from_trxs(std::vector trx_list); uint64_t get_fee_from_trx(libbitcoin::chain::transaction trx); bool get_is_test_net(); + void subscribe_to_address(const std::string address_str, address_update_handler address_updated_callback_handler, + subscription_expired_handler subcription_expired_callback_handler); private: + void subscription_thr(); + libbitcoin::client::obelisk_client obelisk_client; libbitcoin::protocol::zmq::identifier id; + std::string subscription_add; + address_update_handler address_updated_callback_handler; + subscription_expired_handler subcription_expired_callback_handler; std::string protocol; std::string host; @@ -48,6 +64,8 @@ private: std::string url; bool is_connected = false; + std::thread sub_thr; + bool stop = false; }; }} // namespace graphene::peerplays_sidechain \ No newline at end of file diff --git a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp index 96e8ec9c..4c7bb4a3 100644 --- a/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp +++ b/libraries/plugins/peerplays_sidechain/include/graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp @@ -18,6 +18,8 @@ namespace graphene { namespace peerplays_sidechain { +#define SUBSCRIPTION_THREAD_INTERVAL (10000) + class btc_txout { public: std::string txid_; @@ -244,11 +246,21 @@ private: std::string sign_transaction(const sidechain_transaction_object &sto); std::string send_transaction(const sidechain_transaction_object &sto); + void extract_deposit(const std::vector vins) ; void block_handle_event(const block_data &event_data); + void subscribe_address_thread(); void trx_handle_event(const libbitcoin::chain::transaction &event_data); + void subscription_expired_event(const std::string& address); std::string get_redeemscript_for_userdeposit(const std::string &user_address); void on_changed_objects(const vector &ids, const flat_set &accounts); void on_changed_objects_cb(const vector &ids, const flat_set &accounts); + + std::map> libbitcoin_clients; + void trx_event(const libbitcoin::chain::transaction &trx); + + bool stop_sub_thr = true; + + std::thread subscribe_thr; }; }} // namespace graphene::peerplays_sidechain diff --git a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp index 76e23ab0..7baeee72 100644 --- a/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp +++ b/libraries/plugins/peerplays_sidechain/sidechain_net_handler_bitcoin.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include @@ -426,7 +427,7 @@ btc_tx bitcoin_libbitcoin_client::getrawtransaction(const std::string &txid, con std::string tx_hash; uint32_t confirmitions; - libbitcoin::chain::output::list outs = get_transaction(txid, tx_hash, confirmitions); + libbitcoin::chain::output::list outs = get_transaction_outs(txid, tx_hash, confirmitions); if (tx_hash.empty()) { return tx; @@ -726,10 +727,19 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain } listener = std::unique_ptr(new zmq_listener(ips[0], bitcoin_node_zmq_port)); + listener->block_event_received.connect([this](const block_data &block_event_data) { + std::thread(&sidechain_net_handler_bitcoin::block_handle_event, this, block_event_data).detach(); + }); + + listener->trx_event_received.connect([this](const libbitcoin::chain::transaction &trx_event_data) { + std::thread(&sidechain_net_handler_bitcoin::trx_handle_event, this, trx_event_data).detach(); + }); + + listener->start(); } else { bitcoin_client = std::unique_ptr(new bitcoin_libbitcoin_client(libbitcoin_server_ip)); - - listener = std::unique_ptr(new zmq_listener_libbitcoin(libbitcoin_server_ip, libbitcoin_block_zmq_port, libbitcoin_trx_zmq_port)); + stop_sub_thr = false; + subscribe_thr = std::thread(&sidechain_net_handler_bitcoin::subscribe_address_thread, this); } std::string chain_info = bitcoin_client->getblockchaininfo(); @@ -745,16 +755,6 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain bitcoin_client->getnetworkinfo(); - listener->block_event_received.connect([this](const block_data &block_event_data) { - std::thread(&sidechain_net_handler_bitcoin::block_handle_event, this, block_event_data).detach(); - }); - - listener->trx_event_received.connect([this](const libbitcoin::chain::transaction &trx_event_data) { - std::thread(&sidechain_net_handler_bitcoin::trx_handle_event, this, trx_event_data).detach(); - }); - - listener->start(); - database.changed_objects.connect([this](const vector &ids, const flat_set &accounts) { on_changed_objects(ids, accounts); }); @@ -765,6 +765,11 @@ sidechain_net_handler_bitcoin::~sidechain_net_handler_bitcoin() { if (on_changed_objects_task.valid()) { on_changed_objects_task.cancel_and_wait(__FUNCTION__); } + + if(subscribe_thr.joinable()) { + stop_sub_thr = true; + subscribe_thr.join(); + } } catch (fc::canceled_exception &) { // Expected exception. Move along. } catch (fc::exception &e) { @@ -1655,15 +1660,79 @@ std::string sidechain_net_handler_bitcoin::send_transaction(const sidechain_tran return tx.get_txid().str(); } -void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) { +void sidechain_net_handler_bitcoin::subscribe_address_thread() { + while (!stop_sub_thr) { + const auto &sidechain_addresses_idx = database.get_index_type(); + const auto &sidechain_addresses_by_sidechain_idx = sidechain_addresses_idx.indices().get(); + const auto &sidechain_addresses_by_sidechain_range = sidechain_addresses_by_sidechain_idx.equal_range(sidechain); + std::for_each(sidechain_addresses_by_sidechain_range.first, sidechain_addresses_by_sidechain_range.second, + [&](const sidechain_address_object &sao) { + scoped_lock interlock(event_handler_mutex); + if (!sao.deposit_address.empty() && !libbitcoin_clients[sao.deposit_address]) { - auto vins = bitcoin_client->getblock(event_data); + if (sao.expires > database.head_block_time()) { + libbitcoin_clients[sao.deposit_address] = std::unique_ptr(new libbitcoin_client(libbitcoin_server_ip)); + auto trx_event_callback = std::bind(&sidechain_net_handler_bitcoin::trx_event, this, std::placeholders::_1); + auto sub_expired_callback = std::bind(&sidechain_net_handler_bitcoin::subscription_expired_event, this, std::placeholders::_1); + libbitcoin_clients[sao.deposit_address]->subscribe_to_address(sao.deposit_address, trx_event_callback, sub_expired_callback); + } + } + }); - add_to_son_listener_log("BLOCK : " + event_data.block_hash); + std::this_thread::sleep_for(std::chrono::milliseconds(SUBSCRIPTION_THREAD_INTERVAL)); + } + ilog("Exit from subsription thread ...."); +} + +void sidechain_net_handler_bitcoin::subscription_expired_event(const std::string &address) { scoped_lock interlock(event_handler_mutex); - const auto &sidechain_addresses_idx = database.get_index_type().indices().get(); + // we just delete the slot for address on which subscription expired in the different + // thread which is triggered on every 10s we will renew the subscription if needed + if (libbitcoin_clients[address]) { + libbitcoin_clients.erase(address); + } +} +void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transaction& trx) { + scoped_lock interlock(event_handler_mutex); + std::vector result; + uint32_t vout_seq = 0; + for (const auto &o : trx.outputs()) { + std::vector address_list; + + libbitcoin::wallet::payment_address::list addresses; + if (/*is_test_net*/true) { + addresses = o.addresses(libbitcoin::wallet::payment_address::testnet_p2kh, + libbitcoin::wallet::payment_address::testnet_p2sh); + } else { + addresses = o.addresses(); + } + + for (auto &payment_address : addresses) { + std::stringstream ss; + ss << payment_address; + address_list.emplace_back(ss.str()); + } + + // addres list consists usual of one element + for (auto &address : address_list) { + const auto address_base58 = address; + info_for_vin vin; + vin.out.hash_tx = libbitcoin::config::hash256(trx.hash()).to_string(); + vin.out.amount = std::floor(o.value()); + vin.out.n_vout = vout_seq; + vin.address = address_base58; + result.push_back(vin); + } + vout_seq++; + } + + extract_deposit(result); +} + +void sidechain_net_handler_bitcoin::extract_deposit(const std::vector vins) { + const auto &sidechain_addresses_idx = database.get_index_type().indices().get(); for (const auto &v : vins) { // !!! EXTRACT DEPOSIT ADDRESS FROM SIDECHAIN ADDRESS OBJECT const auto &addr_itr = sidechain_addresses_idx.find(std::make_tuple(sidechain, v.address, time_point_sec::maximum())); @@ -1697,6 +1766,13 @@ void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_d } } +void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) { + auto vins = bitcoin_client->getblock(event_data); + add_to_son_listener_log("BLOCK : " + event_data.block_hash); + scoped_lock interlock(event_handler_mutex); + extract_deposit(vins); +} + void sidechain_net_handler_bitcoin::trx_handle_event(const libbitcoin::chain::transaction &trx_data) { bitcoin_client->import_trx_to_memory_pool(trx_data); }