Libbitcoin subscribe to address
- Exclude using libbitcoin block event - Use SON address for subscribing on the libbitcoin server - Use the event from libbitcoin server if something is related to subscribed address
This commit is contained in:
parent
d367c308b8
commit
8af1c851e6
4 changed files with 198 additions and 28 deletions
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
#include <bitcoin/explorer/config/transaction.hpp>
|
||||
#include <bitcoin/system/config/hash256.hpp>
|
||||
#include <bitcoin/explorer/callback_state.hpp>
|
||||
|
||||
#include <boost/xpressive/xpressive.hpp>
|
||||
|
||||
|
|
@ -54,6 +55,11 @@ libbitcoin_client::libbitcoin_client(std::string url) :
|
|||
is_connected = true;
|
||||
}
|
||||
|
||||
libbitcoin_client::~libbitcoin_client() {
|
||||
stop = true;
|
||||
sub_thr.detach();
|
||||
}
|
||||
|
||||
std::string libbitcoin_client::send_transaction(std::string tx) {
|
||||
|
||||
std::string res;
|
||||
|
|
@ -78,28 +84,41 @@ std::string libbitcoin_client::send_transaction(std::string tx) {
|
|||
return res;
|
||||
}
|
||||
|
||||
libbitcoin::chain::output::list libbitcoin_client::get_transaction(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions) {
|
||||
|
||||
libbitcoin::chain::output::list outs;
|
||||
|
||||
bool libbitcoin_client::get_transaction(const std::string tx, libbitcoin::chain::transaction& trx) {
|
||||
bool result = false;
|
||||
auto error_handler = [&](const std::error_code &ec) {
|
||||
elog("error on fetch_trx_by_hash: ${hash} ${error_code}", ("hash", tx_id)("error_code", ec.message()));
|
||||
elog("error on fetch_trx_by_hash: ${hash} ${error_code}", ("hash", tx)("error_code", ec.message()));
|
||||
result = false;
|
||||
};
|
||||
|
||||
auto transaction_handler = [&](const libbitcoin::chain::transaction &tx_handler) {
|
||||
tx_hash = libbitcoin::config::hash256(tx_handler.hash(false)).to_string();
|
||||
// TODO try to find this value (confirmitions)
|
||||
confirmitions = 1;
|
||||
outs = tx_handler.outputs();
|
||||
trx = tx_handler;
|
||||
result = true;
|
||||
};
|
||||
|
||||
libbitcoin::hash_digest hash = libbitcoin::config::hash256(tx_id);
|
||||
libbitcoin::hash_digest hash = libbitcoin::config::hash256(tx);
|
||||
|
||||
// obelisk_client.blockchain_fetch_transaction (error_handler, transaction_handler,hash);
|
||||
obelisk_client.blockchain_fetch_transaction2(error_handler, transaction_handler, hash);
|
||||
|
||||
obelisk_client.wait();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
libbitcoin::chain::output::list libbitcoin_client::get_transaction_outs(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions) {
|
||||
|
||||
libbitcoin::chain::output::list outs;
|
||||
libbitcoin::chain::transaction trx;
|
||||
|
||||
if (get_transaction(tx_id, trx)) {
|
||||
|
||||
tx_hash = libbitcoin::config::hash256(trx.hash(false)).to_string();
|
||||
// TODO try to find this value (confirmitions)
|
||||
confirmitions = 1;
|
||||
outs = trx.outputs();
|
||||
}
|
||||
|
||||
return outs;
|
||||
}
|
||||
|
||||
|
|
@ -224,4 +243,49 @@ uint64_t libbitcoin_client::get_average_fee_from_trxs(std::vector<libbitcoin::ch
|
|||
|
||||
return average_estimated_fee;
|
||||
}
|
||||
|
||||
void libbitcoin_client::subscribe_to_address(const std::string address_str, address_update_handler address_updated_callback_handler,
|
||||
subscription_expired_handler subcription_expired_callback_handler) {
|
||||
|
||||
subscription_add = address_str;
|
||||
this->address_updated_callback_handler = address_updated_callback_handler;
|
||||
this->subcription_expired_callback_handler = subcription_expired_callback_handler;
|
||||
sub_thr = std::thread(&libbitcoin_client::subscription_thr, this);
|
||||
}
|
||||
|
||||
void libbitcoin_client::subscription_thr() {
|
||||
|
||||
libbitcoin::wallet::payment_address address(subscription_add);
|
||||
|
||||
auto on_subscribed = [&](const std::error_code &error) {
|
||||
ilog("On subscribed ${error}", ("error", error.message()));
|
||||
};
|
||||
|
||||
auto on_error = [&](const std::error_code &error) {
|
||||
elog("On subscribed there is an error: ${error}", ("error", error.message()));
|
||||
};
|
||||
|
||||
auto on_update = [&](const std::error_code &error, uint16_t sequence, size_t height, const libbitcoin::hash_digest &tx_hash) {
|
||||
wlog("On update value error: ${error}", ("error", error.value()));
|
||||
if (!error.value()) {
|
||||
wlog("sequence: ${sequence}, height: ${height}, hash: ${hash}", ("sequence", sequence)("height", height)("hash", libbitcoin::config::hash256(tx_hash).to_string()));
|
||||
libbitcoin::chain::transaction trx;
|
||||
if (get_transaction(libbitcoin::config::hash256(tx_hash).to_string(), trx)) {
|
||||
address_updated_callback_handler(trx);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
obelisk_client.set_on_update(on_update);
|
||||
obelisk_client.subscribe_address(on_error, on_subscribed, address);
|
||||
obelisk_client.wait();
|
||||
|
||||
obelisk_client.monitor(SUBSCRIBE_TIME_DURATION);
|
||||
|
||||
if (!stop) {
|
||||
ilog("Subsription monitor expired, renew if needed ...");
|
||||
subcription_expired_callback_handler(subscription_add);
|
||||
}
|
||||
}
|
||||
|
||||
}} // namespace graphene::peerplays_sidechain
|
||||
|
|
@ -12,6 +12,7 @@
|
|||
#define DEAFULT_LIBBITCOIN_TRX_FEE (20000)
|
||||
#define MAX_TRXS_IN_MEMORY_POOL (30000)
|
||||
#define MIN_TRXS_IN_BUCKET (100)
|
||||
#define SUBSCRIBE_TIME_DURATION (2 * 60)
|
||||
|
||||
#define GENESIS_MAINNET_HASH "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
|
||||
#define GENESIS_TESTNET_HASH "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
|
||||
|
|
@ -22,6 +23,12 @@ namespace graphene { namespace peerplays_sidechain {
|
|||
typedef std::function<void(const libbitcoin::chain::block &)>
|
||||
block_update_handler;
|
||||
|
||||
typedef std::function<void(const libbitcoin::chain::transaction& trx)>
|
||||
address_update_handler;
|
||||
|
||||
typedef std::function<void(const std::string& )>
|
||||
subscription_expired_handler;
|
||||
|
||||
struct list_unspent_replay {
|
||||
std::string hash;
|
||||
uint64_t value;
|
||||
|
|
@ -31,16 +38,25 @@ struct list_unspent_replay {
|
|||
class libbitcoin_client {
|
||||
public:
|
||||
libbitcoin_client(std::string url);
|
||||
~libbitcoin_client();
|
||||
std::string send_transaction(const std::string tx);
|
||||
libbitcoin::chain::output::list get_transaction(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions);
|
||||
bool get_transaction(const std::string tx, libbitcoin::chain::transaction& trx);
|
||||
libbitcoin::chain::output::list get_transaction_outs(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions);
|
||||
std::vector<list_unspent_replay> listunspent(std::string address, double amount);
|
||||
uint64_t get_average_fee_from_trxs(std::vector<libbitcoin::chain::transaction> trx_list);
|
||||
uint64_t get_fee_from_trx(libbitcoin::chain::transaction trx);
|
||||
bool get_is_test_net();
|
||||
void subscribe_to_address(const std::string address_str, address_update_handler address_updated_callback_handler,
|
||||
subscription_expired_handler subcription_expired_callback_handler);
|
||||
|
||||
private:
|
||||
void subscription_thr();
|
||||
|
||||
libbitcoin::client::obelisk_client obelisk_client;
|
||||
libbitcoin::protocol::zmq::identifier id;
|
||||
std::string subscription_add;
|
||||
address_update_handler address_updated_callback_handler;
|
||||
subscription_expired_handler subcription_expired_callback_handler;
|
||||
|
||||
std::string protocol;
|
||||
std::string host;
|
||||
|
|
@ -48,6 +64,8 @@ private:
|
|||
std::string url;
|
||||
|
||||
bool is_connected = false;
|
||||
std::thread sub_thr;
|
||||
bool stop = false;
|
||||
};
|
||||
|
||||
}} // namespace graphene::peerplays_sidechain
|
||||
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
namespace graphene { namespace peerplays_sidechain {
|
||||
|
||||
#define SUBSCRIPTION_THREAD_INTERVAL (10000)
|
||||
|
||||
class btc_txout {
|
||||
public:
|
||||
std::string txid_;
|
||||
|
|
@ -244,11 +246,21 @@ private:
|
|||
std::string sign_transaction(const sidechain_transaction_object &sto);
|
||||
std::string send_transaction(const sidechain_transaction_object &sto);
|
||||
|
||||
void extract_deposit(const std::vector<info_for_vin> vins) ;
|
||||
void block_handle_event(const block_data &event_data);
|
||||
void subscribe_address_thread();
|
||||
void trx_handle_event(const libbitcoin::chain::transaction &event_data);
|
||||
void subscription_expired_event(const std::string& address);
|
||||
std::string get_redeemscript_for_userdeposit(const std::string &user_address);
|
||||
void on_changed_objects(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts);
|
||||
void on_changed_objects_cb(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts);
|
||||
|
||||
std::map<std::string, std::unique_ptr<libbitcoin_client>> libbitcoin_clients;
|
||||
void trx_event(const libbitcoin::chain::transaction &trx);
|
||||
|
||||
bool stop_sub_thr = true;
|
||||
|
||||
std::thread subscribe_thr;
|
||||
};
|
||||
|
||||
}} // namespace graphene::peerplays_sidechain
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
#include <graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <thread>
|
||||
|
||||
#include <boost/algorithm/hex.hpp>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
|
|
@ -426,7 +427,7 @@ btc_tx bitcoin_libbitcoin_client::getrawtransaction(const std::string &txid, con
|
|||
std::string tx_hash;
|
||||
uint32_t confirmitions;
|
||||
|
||||
libbitcoin::chain::output::list outs = get_transaction(txid, tx_hash, confirmitions);
|
||||
libbitcoin::chain::output::list outs = get_transaction_outs(txid, tx_hash, confirmitions);
|
||||
|
||||
if (tx_hash.empty()) {
|
||||
return tx;
|
||||
|
|
@ -726,10 +727,19 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
|
|||
}
|
||||
|
||||
listener = std::unique_ptr<zmq_listener>(new zmq_listener(ips[0], bitcoin_node_zmq_port));
|
||||
listener->block_event_received.connect([this](const block_data &block_event_data) {
|
||||
std::thread(&sidechain_net_handler_bitcoin::block_handle_event, this, block_event_data).detach();
|
||||
});
|
||||
|
||||
listener->trx_event_received.connect([this](const libbitcoin::chain::transaction &trx_event_data) {
|
||||
std::thread(&sidechain_net_handler_bitcoin::trx_handle_event, this, trx_event_data).detach();
|
||||
});
|
||||
|
||||
listener->start();
|
||||
} else {
|
||||
bitcoin_client = std::unique_ptr<bitcoin_libbitcoin_client>(new bitcoin_libbitcoin_client(libbitcoin_server_ip));
|
||||
|
||||
listener = std::unique_ptr<zmq_listener_libbitcoin>(new zmq_listener_libbitcoin(libbitcoin_server_ip, libbitcoin_block_zmq_port, libbitcoin_trx_zmq_port));
|
||||
stop_sub_thr = false;
|
||||
subscribe_thr = std::thread(&sidechain_net_handler_bitcoin::subscribe_address_thread, this);
|
||||
}
|
||||
|
||||
std::string chain_info = bitcoin_client->getblockchaininfo();
|
||||
|
|
@ -745,16 +755,6 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
|
|||
|
||||
bitcoin_client->getnetworkinfo();
|
||||
|
||||
listener->block_event_received.connect([this](const block_data &block_event_data) {
|
||||
std::thread(&sidechain_net_handler_bitcoin::block_handle_event, this, block_event_data).detach();
|
||||
});
|
||||
|
||||
listener->trx_event_received.connect([this](const libbitcoin::chain::transaction &trx_event_data) {
|
||||
std::thread(&sidechain_net_handler_bitcoin::trx_handle_event, this, trx_event_data).detach();
|
||||
});
|
||||
|
||||
listener->start();
|
||||
|
||||
database.changed_objects.connect([this](const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts) {
|
||||
on_changed_objects(ids, accounts);
|
||||
});
|
||||
|
|
@ -765,6 +765,11 @@ sidechain_net_handler_bitcoin::~sidechain_net_handler_bitcoin() {
|
|||
if (on_changed_objects_task.valid()) {
|
||||
on_changed_objects_task.cancel_and_wait(__FUNCTION__);
|
||||
}
|
||||
|
||||
if(subscribe_thr.joinable()) {
|
||||
stop_sub_thr = true;
|
||||
subscribe_thr.join();
|
||||
}
|
||||
} catch (fc::canceled_exception &) {
|
||||
// Expected exception. Move along.
|
||||
} catch (fc::exception &e) {
|
||||
|
|
@ -1655,15 +1660,79 @@ std::string sidechain_net_handler_bitcoin::send_transaction(const sidechain_tran
|
|||
return tx.get_txid().str();
|
||||
}
|
||||
|
||||
void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) {
|
||||
void sidechain_net_handler_bitcoin::subscribe_address_thread() {
|
||||
while (!stop_sub_thr) {
|
||||
const auto &sidechain_addresses_idx = database.get_index_type<sidechain_address_index>();
|
||||
const auto &sidechain_addresses_by_sidechain_idx = sidechain_addresses_idx.indices().get<by_sidechain>();
|
||||
const auto &sidechain_addresses_by_sidechain_range = sidechain_addresses_by_sidechain_idx.equal_range(sidechain);
|
||||
std::for_each(sidechain_addresses_by_sidechain_range.first, sidechain_addresses_by_sidechain_range.second,
|
||||
[&](const sidechain_address_object &sao) {
|
||||
scoped_lock interlock(event_handler_mutex);
|
||||
if (!sao.deposit_address.empty() && !libbitcoin_clients[sao.deposit_address]) {
|
||||
|
||||
auto vins = bitcoin_client->getblock(event_data);
|
||||
if (sao.expires > database.head_block_time()) {
|
||||
libbitcoin_clients[sao.deposit_address] = std::unique_ptr<libbitcoin_client>(new libbitcoin_client(libbitcoin_server_ip));
|
||||
auto trx_event_callback = std::bind(&sidechain_net_handler_bitcoin::trx_event, this, std::placeholders::_1);
|
||||
auto sub_expired_callback = std::bind(&sidechain_net_handler_bitcoin::subscription_expired_event, this, std::placeholders::_1);
|
||||
libbitcoin_clients[sao.deposit_address]->subscribe_to_address(sao.deposit_address, trx_event_callback, sub_expired_callback);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
add_to_son_listener_log("BLOCK : " + event_data.block_hash);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(SUBSCRIPTION_THREAD_INTERVAL));
|
||||
}
|
||||
|
||||
ilog("Exit from subsription thread ....");
|
||||
}
|
||||
|
||||
void sidechain_net_handler_bitcoin::subscription_expired_event(const std::string &address) {
|
||||
scoped_lock interlock(event_handler_mutex);
|
||||
const auto &sidechain_addresses_idx = database.get_index_type<sidechain_address_index>().indices().get<by_sidechain_and_deposit_address_and_expires>();
|
||||
// we just delete the slot for address on which subscription expired in the different
|
||||
// thread which is triggered on every 10s we will renew the subscription if needed
|
||||
if (libbitcoin_clients[address]) {
|
||||
libbitcoin_clients.erase(address);
|
||||
}
|
||||
}
|
||||
|
||||
void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transaction& trx) {
|
||||
scoped_lock interlock(event_handler_mutex);
|
||||
std::vector<info_for_vin> result;
|
||||
uint32_t vout_seq = 0;
|
||||
for (const auto &o : trx.outputs()) {
|
||||
std::vector<std::string> address_list;
|
||||
|
||||
libbitcoin::wallet::payment_address::list addresses;
|
||||
if (/*is_test_net*/true) {
|
||||
addresses = o.addresses(libbitcoin::wallet::payment_address::testnet_p2kh,
|
||||
libbitcoin::wallet::payment_address::testnet_p2sh);
|
||||
} else {
|
||||
addresses = o.addresses();
|
||||
}
|
||||
|
||||
for (auto &payment_address : addresses) {
|
||||
std::stringstream ss;
|
||||
ss << payment_address;
|
||||
address_list.emplace_back(ss.str());
|
||||
}
|
||||
|
||||
// addres list consists usual of one element
|
||||
for (auto &address : address_list) {
|
||||
const auto address_base58 = address;
|
||||
info_for_vin vin;
|
||||
vin.out.hash_tx = libbitcoin::config::hash256(trx.hash()).to_string();
|
||||
vin.out.amount = std::floor(o.value());
|
||||
vin.out.n_vout = vout_seq;
|
||||
vin.address = address_base58;
|
||||
result.push_back(vin);
|
||||
}
|
||||
vout_seq++;
|
||||
}
|
||||
|
||||
extract_deposit(result);
|
||||
}
|
||||
|
||||
void sidechain_net_handler_bitcoin::extract_deposit(const std::vector<info_for_vin> vins) {
|
||||
const auto &sidechain_addresses_idx = database.get_index_type<sidechain_address_index>().indices().get<by_sidechain_and_deposit_address_and_expires>();
|
||||
for (const auto &v : vins) {
|
||||
// !!! EXTRACT DEPOSIT ADDRESS FROM SIDECHAIN ADDRESS OBJECT
|
||||
const auto &addr_itr = sidechain_addresses_idx.find(std::make_tuple(sidechain, v.address, time_point_sec::maximum()));
|
||||
|
|
@ -1697,6 +1766,13 @@ void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_d
|
|||
}
|
||||
}
|
||||
|
||||
void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) {
|
||||
auto vins = bitcoin_client->getblock(event_data);
|
||||
add_to_son_listener_log("BLOCK : " + event_data.block_hash);
|
||||
scoped_lock interlock(event_handler_mutex);
|
||||
extract_deposit(vins);
|
||||
}
|
||||
|
||||
void sidechain_net_handler_bitcoin::trx_handle_event(const libbitcoin::chain::transaction &trx_data) {
|
||||
bitcoin_client->import_trx_to_memory_pool(trx_data);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue