Libbitcoin subscribe to address #807

Closed
hirunda wants to merge 5 commits from local_dev_subscribe_address into develop
17 changed files with 561 additions and 353 deletions

View file

@ -368,8 +368,7 @@ vector<operation_history_object> history_api::get_account_history(const std::str
return _app.elasticsearch_thread->async([&es, &account, &stop, &limit, &start]() { return _app.elasticsearch_thread->async([&es, &account, &stop, &limit, &start]() {
return es->get_account_history(account, stop, limit, start); return es->get_account_history(account, stop, limit, start);
}, },
"thread invoke for method " BOOST_PP_STRINGIZE(method_name)) "thread invoke for method " BOOST_PP_STRINGIZE(method_name)).wait();
.wait();
} }
} }

View file

@ -13,7 +13,7 @@
namespace graphene { namespace peerplays_sidechain { namespace graphene { namespace peerplays_sidechain {
libbitcoin_client::libbitcoin_client(std::string url) : libbitcoin_client::libbitcoin_client(const std::string &url) :
obelisk_client(LIBBITCOIN_SERVER_TIMEOUT, LIBBITCOIN_SERVER_RETRIES) { obelisk_client(LIBBITCOIN_SERVER_TIMEOUT, LIBBITCOIN_SERVER_RETRIES) {
std::string reg_expr = "^((?P<Protocol>https|http|tcp):\\/\\/)?(?P<Host>[a-zA-Z0-9\\-\\.]+)(:(?P<Port>\\d{1,5}))?(?P<Target>\\/.+)?"; std::string reg_expr = "^((?P<Protocol>https|http|tcp):\\/\\/)?(?P<Host>[a-zA-Z0-9\\-\\.]+)(:(?P<Port>\\d{1,5}))?(?P<Target>\\/.+)?";
@ -50,11 +50,19 @@ libbitcoin_client::libbitcoin_client(std::string url) :
if (!obelisk_client.connect(connection)) { if (!obelisk_client.connect(connection)) {
elog("Can't connect libbitcoin for url: ${url}", ("url", final_url)); elog("Can't connect libbitcoin for url: ${url}", ("url", final_url));
} }
is_connected = true;
} }
std::string libbitcoin_client::send_transaction(std::string tx) { libbitcoin_client::~libbitcoin_client() {
if (poller_trx_thr.joinable()) {
stop_poller_trx_thread = true;
poller_transacion_done.notify_all();
poller_trx_thr.join();
}
stop = true;
sub_thr.detach();
}
std::string libbitcoin_client::send_transaction(const std::string &tx) {
std::string res; std::string res;
@ -78,32 +86,45 @@ std::string libbitcoin_client::send_transaction(std::string tx) {
return res; return res;
} }
libbitcoin::chain::output::list libbitcoin_client::get_transaction(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions) { bool libbitcoin_client::get_transaction(const std::string &tx, libbitcoin::chain::transaction &trx) {
bool result = false;
libbitcoin::chain::output::list outs;
auto error_handler = [&](const std::error_code &ec) { auto error_handler = [&](const std::error_code &ec) {
elog("error on fetch_trx_by_hash: ${hash} ${error_code}", ("hash", tx_id)("error_code", ec.message())); elog("error on fetch_trx_by_hash: ${hash} ${error_code}", ("hash", tx)("error_code", ec.message()));
result = false;
}; };
auto transaction_handler = [&](const libbitcoin::chain::transaction &tx_handler) { auto transaction_handler = [&](const libbitcoin::chain::transaction &tx_handler) {
tx_hash = libbitcoin::config::hash256(tx_handler.hash(false)).to_string(); trx = tx_handler;
// TODO try to find this value (confirmitions) result = true;
confirmitions = 1;
outs = tx_handler.outputs();
}; };
libbitcoin::hash_digest hash = libbitcoin::config::hash256(tx_id); libbitcoin::hash_digest hash = libbitcoin::config::hash256(tx);
// obelisk_client.blockchain_fetch_transaction (error_handler, transaction_handler,hash); // obelisk_client.blockchain_fetch_transaction (error_handler, transaction_handler,hash);
obelisk_client.blockchain_fetch_transaction2(error_handler, transaction_handler, hash); obelisk_client.blockchain_fetch_transaction2(error_handler, transaction_handler, hash);
obelisk_client.wait(); obelisk_client.wait();
return result;
}
libbitcoin::chain::output::list libbitcoin_client::get_transaction_outs(const std::string &tx_id, std::string &tx_hash, uint32_t &confirmitions) {
libbitcoin::chain::output::list outs;
libbitcoin::chain::transaction trx;
if (get_transaction(tx_id, trx)) {
tx_hash = libbitcoin::config::hash256(trx.hash(false)).to_string();
// TODO try to find this value (confirmitions)
confirmitions = 1;
outs = trx.outputs();
}
return outs; return outs;
} }
std::vector<list_unspent_replay> libbitcoin_client::listunspent(std::string address, double amount) { std::vector<list_unspent_replay> libbitcoin_client::listunspent(const std::string &address, double amount) {
std::vector<list_unspent_replay> result; std::vector<list_unspent_replay> result;
auto error_handler = [&](const std::error_code &ec) { auto error_handler = [&](const std::error_code &ec) {
@ -133,7 +154,9 @@ std::vector<list_unspent_replay> libbitcoin_client::listunspent(std::string addr
bool libbitcoin_client::get_is_test_net() { bool libbitcoin_client::get_is_test_net() {
bool result = false; if (is_test_net != -1) {
return is_test_net;
}
auto error_handler = [&](const std::error_code &ec) { auto error_handler = [&](const std::error_code &ec) {
elog("error on fetching genesis block ${error_code}", ("error_code", ec.message())); elog("error on fetching genesis block ${error_code}", ("error_code", ec.message()));
@ -142,17 +165,19 @@ bool libbitcoin_client::get_is_test_net() {
auto block_header_handler = [&](const libbitcoin::chain::header &block_header) { auto block_header_handler = [&](const libbitcoin::chain::header &block_header) {
std::string hash_str = libbitcoin::config::hash256(block_header.hash()).to_string(); std::string hash_str = libbitcoin::config::hash256(block_header.hash()).to_string();
if (hash_str == GENESIS_TESTNET_HASH || hash_str == GENESIS_REGTEST_HASH) { if (hash_str == GENESIS_TESTNET_HASH || hash_str == GENESIS_REGTEST_HASH) {
result = true; is_test_net = 1;
} else {
is_test_net = 0;
} }
}; };
obelisk_client.blockchain_fetch_block_header(error_handler, block_header_handler, 0); obelisk_client.blockchain_fetch_block_header(error_handler, block_header_handler, 0);
obelisk_client.wait(); obelisk_client.wait();
return result; return is_test_net;
} }
uint64_t libbitcoin_client::get_fee_from_trx(libbitcoin::chain::transaction trx) { uint64_t libbitcoin_client::get_fee_from_trx(const libbitcoin::chain::transaction &trx) {
bool general_fee_est_error = false; bool general_fee_est_error = false;
if (trx.is_coinbase()) { if (trx.is_coinbase()) {
@ -201,7 +226,7 @@ uint64_t libbitcoin_client::get_fee_from_trx(libbitcoin::chain::transaction trx)
} }
} }
uint64_t libbitcoin_client::get_average_fee_from_trxs(std::vector<libbitcoin::chain::transaction> trx_list) { uint64_t libbitcoin_client::get_average_fee_from_trxs(const std::vector<libbitcoin::chain::transaction> &trx_list) {
std::vector<uint64_t> fee_per_trxs; std::vector<uint64_t> fee_per_trxs;
for (auto &trx : trx_list) { for (auto &trx : trx_list) {
@ -224,4 +249,81 @@ uint64_t libbitcoin_client::get_average_fee_from_trxs(std::vector<libbitcoin::ch
return average_estimated_fee; return average_estimated_fee;
} }
void libbitcoin_client::subscribe_to_address(const std::string &address_str, address_update_handler address_updated_callback_handler,
subscription_expired_handler subcription_expired_callback_handler) {
subscription_add = address_str;
this->address_updated_callback_handler = address_updated_callback_handler;
this->subcription_expired_callback_handler = subcription_expired_callback_handler;
sub_thr = std::thread(&libbitcoin_client::subscription_thr, this);
poller_trx_thr = std::thread(&libbitcoin_client::poller_transaction_thr, this);
}
void libbitcoin_client::poller_transaction_thr() {
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
while (!stop_poller_trx_thread) {
libbitcoin::chain::transaction trx;
if (!target_trxs_pool.empty() && get_transaction(libbitcoin::config::hash256(target_trxs_pool.back()).to_string(), trx)) {
target_trxs_pool.pop_back();
address_updated_callback_handler(trx, get_is_test_net());
}
poller_transacion_done.wait_for(lck, std::chrono::minutes(1));
}
wlog("Exit from poller_transaction_thr() ...");
}
bool libbitcoin_client::is_target_trxs_pool_empty() {
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
return target_trxs_pool.empty();
}
void libbitcoin_client::subscription_thr() {
libbitcoin::wallet::payment_address address(subscription_add);
auto on_subscribed = [&](const std::error_code &error) {
ilog("On subscribed ${error}", ("error", error.message()));
};
auto on_error = [&](const std::error_code &error) {
elog("On subscribed there is an error: ${error}", ("error", error.message()));
};
auto on_update = [&](const std::error_code &error, uint16_t sequence, size_t height, const libbitcoin::hash_digest &tx_hash) {
wlog("On update value error: ${error}", ("error", error.value()));
if (!error.value()) {
wlog("sequence: ${sequence}, height: ${height}, hash: ${hash}", ("sequence", sequence)("height", height)("hash", libbitcoin::config::hash256(tx_hash).to_string()));
libbitcoin::chain::transaction trx;
if (height == 0) {
std::unique_lock<std::mutex> lck(trxs_pool_mutex);
target_trxs_pool.emplace_back(tx_hash);
} else if ((get_transaction(libbitcoin::config::hash256(tx_hash).to_string(), trx))) {
address_updated_callback_handler(trx, get_is_test_net());
}
}
};
obelisk_client.set_on_update(on_update);
obelisk_client.subscribe_address(on_error, on_subscribed, address);
obelisk_client.wait();
obelisk_client.monitor(SUBSCRIBE_TIME_DURATION);
if (poller_trx_thr.joinable()) {
stop_poller_trx_thread = true;
poller_transacion_done.notify_all();
poller_trx_thr.join();
}
if (!stop) {
ilog("Subsription monitor expired, renew if needed ...");
subcription_expired_callback_handler(subscription_add);
}
}
}} // namespace graphene::peerplays_sidechain }} // namespace graphene::peerplays_sidechain

View file

@ -12,6 +12,7 @@
#define DEAFULT_LIBBITCOIN_TRX_FEE (20000) #define DEAFULT_LIBBITCOIN_TRX_FEE (20000)
#define MAX_TRXS_IN_MEMORY_POOL (30000) #define MAX_TRXS_IN_MEMORY_POOL (30000)
#define MIN_TRXS_IN_BUCKET (100) #define MIN_TRXS_IN_BUCKET (100)
#define SUBSCRIBE_TIME_DURATION (30)
#define GENESIS_MAINNET_HASH "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" #define GENESIS_MAINNET_HASH "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
#define GENESIS_TESTNET_HASH "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943" #define GENESIS_TESTNET_HASH "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
@ -19,8 +20,8 @@
namespace graphene { namespace peerplays_sidechain { namespace graphene { namespace peerplays_sidechain {
typedef std::function<void(const libbitcoin::chain::block &)> using address_update_handler = std::function<void(const libbitcoin::chain::transaction &trx, const bool &is_test_net)>;
block_update_handler; using subscription_expired_handler = std::function<void(const std::string &)>;
struct list_unspent_replay { struct list_unspent_replay {
std::string hash; std::string hash;
@ -30,24 +31,43 @@ struct list_unspent_replay {
class libbitcoin_client { class libbitcoin_client {
public: public:
libbitcoin_client(std::string url); explicit libbitcoin_client(const std::string &url);
std::string send_transaction(const std::string tx); ~libbitcoin_client();
libbitcoin::chain::output::list get_transaction(std::string tx_id, std::string &tx_hash, uint32_t &confirmitions); std::string send_transaction(const std::string &tx);
std::vector<list_unspent_replay> listunspent(std::string address, double amount); bool get_transaction(const std::string &tx, libbitcoin::chain::transaction &trx);
uint64_t get_average_fee_from_trxs(std::vector<libbitcoin::chain::transaction> trx_list); libbitcoin::chain::output::list get_transaction_outs(const std::string &tx_id, std::string &tx_hash, uint32_t &confirmitions);
uint64_t get_fee_from_trx(libbitcoin::chain::transaction trx); std::vector<list_unspent_replay> listunspent(const std::string &address, double amount);
uint64_t get_average_fee_from_trxs(const std::vector<libbitcoin::chain::transaction> &trx_list);
uint64_t get_fee_from_trx(const libbitcoin::chain::transaction &trx);
bool get_is_test_net(); bool get_is_test_net();
void subscribe_to_address(const std::string &address_str, address_update_handler address_updated_callback_handler,
subscription_expired_handler subcription_expired_callback_handler);
bool is_target_trxs_pool_empty();
private: private:
void subscription_thr();
void poller_transaction_thr();
libbitcoin::client::obelisk_client obelisk_client; libbitcoin::client::obelisk_client obelisk_client;
libbitcoin::protocol::zmq::identifier id; libbitcoin::protocol::zmq::identifier id;
std::string subscription_add;
address_update_handler address_updated_callback_handler;
subscription_expired_handler subcription_expired_callback_handler;
std::string protocol; std::string protocol;
std::string host; std::string host;
std::string port; std::string port;
std::string url; std::string url;
bool is_connected = false; std::vector<libbitcoin::hash_digest> target_trxs_pool;
std::mutex trxs_pool_mutex;
std::condition_variable poller_transacion_done;
int16_t is_test_net = -1;
std::thread sub_thr;
std::thread poller_trx_thr;
bool stop = false;
bool stop_poller_trx_thread = false;
}; };
}} // namespace graphene::peerplays_sidechain }} // namespace graphene::peerplays_sidechain

View file

@ -18,6 +18,8 @@
namespace graphene { namespace peerplays_sidechain { namespace graphene { namespace peerplays_sidechain {
#define SUBSCRIPTION_THREAD_INTERVAL (10000)
class btc_txout { class btc_txout {
public: public:
std::string txid_; std::string txid_;
@ -63,6 +65,7 @@ public:
std::string label; std::string label;
}; };
virtual ~bitcoin_client_base() = default;
virtual uint64_t estimatesmartfee(uint16_t conf_target = 1) = 0; virtual uint64_t estimatesmartfee(uint16_t conf_target = 1) = 0;
virtual std::vector<info_for_vin> getblock(const block_data &block, int32_t verbosity = 2) = 0; virtual std::vector<info_for_vin> getblock(const block_data &block, int32_t verbosity = 2) = 0;
virtual btc_tx getrawtransaction(const std::string &txid, const bool verbose = false) = 0; virtual btc_tx getrawtransaction(const std::string &txid, const bool verbose = false) = 0;
@ -70,9 +73,7 @@ public:
virtual std::string getblockchaininfo() = 0; virtual std::string getblockchaininfo() = 0;
virtual std::vector<btc_txout> listunspent_by_address_and_amount(const std::string &address, double transfer_amount, const uint32_t minconf = 1, const uint32_t maxconf = 9999999) = 0; virtual std::vector<btc_txout> listunspent_by_address_and_amount(const std::string &address, double transfer_amount, const uint32_t minconf = 1, const uint32_t maxconf = 9999999) = 0;
virtual std::string sendrawtransaction(const std::string &tx_hex) = 0; virtual std::string sendrawtransaction(const std::string &tx_hex) = 0;
virtual void importmulti(const std::vector<multi_params> &address_or_script_array, const bool rescan = true) { virtual void importmulti(const std::vector<multi_params> &address_or_script_array, const bool rescan = true){};
;
};
virtual std::string loadwallet(const std::string &filename) { virtual std::string loadwallet(const std::string &filename) {
return ""; return "";
}; };
@ -100,20 +101,20 @@ public:
public: public:
bitcoin_rpc_client(const std::vector<rpc_credentials> &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection); bitcoin_rpc_client(const std::vector<rpc_credentials> &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection);
uint64_t estimatesmartfee(uint16_t conf_target = 1); uint64_t estimatesmartfee(uint16_t conf_target = 1) final;
std::vector<info_for_vin> getblock(const block_data &block, int32_t verbosity = 2); std::vector<info_for_vin> getblock(const block_data &block, int32_t verbosity = 2);
btc_tx getrawtransaction(const std::string &txid, const bool verbose = false); btc_tx getrawtransaction(const std::string &txid, const bool verbose = false) final;
void getnetworkinfo(); void getnetworkinfo() final;
std::string getblockchaininfo(); std::string getblockchaininfo() final;
void importmulti(const std::vector<multi_params> &address_or_script_array, const bool rescan = true); void importmulti(const std::vector<multi_params> &address_or_script_array, const bool rescan = true) final;
std::vector<btc_txout> listunspent(const uint32_t minconf = 1, const uint32_t maxconf = 9999999); std::vector<btc_txout> listunspent(const uint32_t minconf = 1, const uint32_t maxconf = 9999999);
std::vector<btc_txout> listunspent_by_address_and_amount(const std::string &address, double transfer_amount, const uint32_t minconf = 1, const uint32_t maxconf = 9999999); std::vector<btc_txout> listunspent_by_address_and_amount(const std::string &address, double transfer_amount, const uint32_t minconf = 1, const uint32_t maxconf = 9999999) final;
std::string loadwallet(const std::string &filename); std::string loadwallet(const std::string &filename) final;
std::string sendrawtransaction(const std::string &tx_hex); std::string sendrawtransaction(const std::string &tx_hex) final;
std::string walletlock(); std::string walletlock() final;
bool walletpassphrase(const std::string &passphrase, uint32_t timeout = 60); bool walletpassphrase(const std::string &passphrase, uint32_t timeout = 60) final;
virtual uint64_t ping(rpc_connection &conn) const override; uint64_t ping(rpc_connection &conn) const final;
private: private:
std::string ip; std::string ip;
@ -126,14 +127,14 @@ private:
class bitcoin_libbitcoin_client : public bitcoin_client_base, public libbitcoin_client { class bitcoin_libbitcoin_client : public bitcoin_client_base, public libbitcoin_client {
public: public:
bitcoin_libbitcoin_client(std::string url); explicit bitcoin_libbitcoin_client(const std::string &url);
uint64_t estimatesmartfee(uint16_t conf_target = 1); uint64_t estimatesmartfee(uint16_t conf_target = 1) final;
std::vector<info_for_vin> getblock(const block_data &block, int32_t verbosity = 2); std::vector<info_for_vin> getblock(const block_data &block, int32_t verbosity = 2);
btc_tx getrawtransaction(const std::string &txid, const bool verbose = false); btc_tx getrawtransaction(const std::string &txid, const bool verbose = false) final;
void getnetworkinfo(); void getnetworkinfo() final;
std::string getblockchaininfo(); std::string getblockchaininfo() final;
std::vector<btc_txout> listunspent_by_address_and_amount(const std::string &address, double transfer_amount, const uint32_t minconf = 1, const uint32_t maxconf = 9999999); std::vector<btc_txout> listunspent_by_address_and_amount(const std::string &address, double transfer_amount, const uint32_t minconf = 1, const uint32_t maxconf = 9999999) final;
std::string sendrawtransaction(const std::string &tx_hex); std::string sendrawtransaction(const std::string &tx_hex) final;
private: private:
bool is_test_net = false; bool is_test_net = false;
@ -145,13 +146,11 @@ private:
class zmq_listener_base { class zmq_listener_base {
public: public:
virtual ~zmq_listener_base(){}; virtual ~zmq_listener_base() = default;
zmq_listener_base(std::string _ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port = 0) { zmq_listener_base(const std::string &_ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port = 0) :
ip = _ip; ip(_ip),
block_zmq_port = _block_zmq_port; block_zmq_port(_block_zmq_port),
trx_zmq_port = _trx_zmq_port; trx_zmq_port(_trx_zmq_port){};
stopped = false;
};
virtual void start() = 0; virtual void start() = 0;
boost::signals2::signal<void(const block_data &)> block_event_received; boost::signals2::signal<void(const block_data &)> block_event_received;
boost::signals2::signal<void(const libbitcoin::chain::transaction &)> trx_event_received; boost::signals2::signal<void(const libbitcoin::chain::transaction &)> trx_event_received;
@ -160,16 +159,16 @@ protected:
std::string ip; std::string ip;
uint32_t block_zmq_port; uint32_t block_zmq_port;
uint32_t trx_zmq_port; uint32_t trx_zmq_port;
std::atomic_bool stopped; std::atomic_bool stopped{false};
std::thread block_thr; std::thread block_thr;
std::thread trx_thr; std::thread trx_thr;
}; };
class zmq_listener : public zmq_listener_base { class zmq_listener : public zmq_listener_base {
public: public:
zmq_listener(std::string _ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port = 0); zmq_listener(const std::string &_ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port = 0);
virtual ~zmq_listener(); ~zmq_listener() final;
void start(); void start() final;
private: private:
void handle_zmq(); void handle_zmq();
@ -181,9 +180,9 @@ private:
class zmq_listener_libbitcoin : public zmq_listener_base { class zmq_listener_libbitcoin : public zmq_listener_base {
public: public:
zmq_listener_libbitcoin(std::string _ip, uint32_t _block_zmq_port = 9093, uint32_t _trx_zmq_port = 9094); zmq_listener_libbitcoin(const std::string &_ip, uint32_t _block_zmq_port = 9093, uint32_t _trx_zmq_port = 9094);
virtual ~zmq_listener_libbitcoin(); ~zmq_listener_libbitcoin() final;
void start(); void start() final;
private: private:
void handle_block(); void handle_block();
@ -232,7 +231,7 @@ private:
uint32_t bitcoin_major_version; uint32_t bitcoin_major_version;
std::mutex event_handler_mutex; std::mutex event_handler_mutex;
typedef std::lock_guard<decltype(event_handler_mutex)> scoped_lock; using scoped_lock = std::lock_guard<decltype(event_handler_mutex)>;
std::string create_primary_wallet_address(const std::vector<son_sidechain_info> &son_pubkeys); std::string create_primary_wallet_address(const std::vector<son_sidechain_info> &son_pubkeys);
@ -244,11 +243,21 @@ private:
std::string sign_transaction(const sidechain_transaction_object &sto); std::string sign_transaction(const sidechain_transaction_object &sto);
std::string send_transaction(const sidechain_transaction_object &sto); std::string send_transaction(const sidechain_transaction_object &sto);
void extract_deposit(const std::vector<info_for_vin> &vins);
void block_handle_event(const block_data &event_data); void block_handle_event(const block_data &event_data);
void subscribe_address_thread();
void trx_handle_event(const libbitcoin::chain::transaction &event_data); void trx_handle_event(const libbitcoin::chain::transaction &event_data);
void subscription_expired_event(const std::string &address);
std::string get_redeemscript_for_userdeposit(const std::string &user_address); std::string get_redeemscript_for_userdeposit(const std::string &user_address);
void on_changed_objects(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts); void on_changed_objects(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts);
void on_changed_objects_cb(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts); void on_changed_objects_cb(const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts);
std::map<std::string, std::pair<std::unique_ptr<libbitcoin_client>, bool>> libbitcoin_clients;
void trx_event(const libbitcoin::chain::transaction &trx, const bool &is_test_net);
bool stop_sub_thr = true;
std::thread subscribe_thr;
}; };
}} // namespace graphene::peerplays_sidechain }} // namespace graphene::peerplays_sidechain

View file

@ -875,13 +875,10 @@ void peerplays_sidechain_plugin_impl::on_applied_block(const signed_block &b) {
if (first_block_skipped) { if (first_block_skipped) {
if (son_processing_enabled) { if (son_processing_enabled) {
schedule_son_processing(); schedule_son_processing();
} } else {
else
{
const fc::time_point now_fine = fc::time_point::now(); const fc::time_point now_fine = fc::time_point::now();
const fc::time_point_sec now = now_fine + fc::microseconds(500000); const fc::time_point_sec now = now_fine + fc::microseconds(500000);
if( plugin.database().get_slot_time(1) >= now ) if (plugin.database().get_slot_time(1) >= now) {
{
son_processing_enabled = true; son_processing_enabled = true;
schedule_son_processing(); schedule_son_processing();
} }

View file

@ -1,6 +1,7 @@
#include <graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp> #include <graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp>
#include <algorithm> #include <algorithm>
#include <thread>
#include <boost/algorithm/hex.hpp> #include <boost/algorithm/hex.hpp>
#include <boost/property_tree/json_parser.hpp> #include <boost/property_tree/json_parser.hpp>
@ -332,7 +333,7 @@ bool bitcoin_rpc_client::walletpassphrase(const std::string &passphrase, uint32_
else else
return true; return true;
} }
bitcoin_libbitcoin_client::bitcoin_libbitcoin_client(std::string url) : bitcoin_libbitcoin_client::bitcoin_libbitcoin_client(const std::string &url) :
libbitcoin_client(url) { libbitcoin_client(url) {
estimate_fee_ext = std::unique_ptr<estimate_fee_external>(new estimate_fee_external()); estimate_fee_ext = std::unique_ptr<estimate_fee_external>(new estimate_fee_external());
@ -426,7 +427,7 @@ btc_tx bitcoin_libbitcoin_client::getrawtransaction(const std::string &txid, con
std::string tx_hash; std::string tx_hash;
uint32_t confirmitions; uint32_t confirmitions;
libbitcoin::chain::output::list outs = get_transaction(txid, tx_hash, confirmitions); libbitcoin::chain::output::list outs = get_transaction_outs(txid, tx_hash, confirmitions);
if (tx_hash.empty()) { if (tx_hash.empty()) {
return tx; return tx;
@ -507,7 +508,7 @@ uint64_t bitcoin_rpc_client::ping(rpc_connection &conn) const {
// ============================================================================= // =============================================================================
zmq_listener::zmq_listener(std::string _ip, uint32_t _zmq_block_port, uint32_t _zmq_trx_port) : zmq_listener::zmq_listener(const std::string &_ip, uint32_t _zmq_block_port, uint32_t _zmq_trx_port) :
zmq_listener_base(_ip, _zmq_block_port, _zmq_trx_port), zmq_listener_base(_ip, _zmq_block_port, _zmq_trx_port),
ctx(1), ctx(1),
socket(ctx, ZMQ_SUB) { socket(ctx, ZMQ_SUB) {
@ -577,7 +578,7 @@ void zmq_listener::handle_zmq() {
// ============================================================================= // =============================================================================
zmq_listener_libbitcoin::zmq_listener_libbitcoin(std::string _ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port) : zmq_listener_libbitcoin::zmq_listener_libbitcoin(const std::string &_ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port) :
zmq_listener_base(_ip, _block_zmq_port, _trx_zmq_port), zmq_listener_base(_ip, _block_zmq_port, _trx_zmq_port),
block_socket(block_context, libbitcoin::protocol::zmq::socket::role::subscriber), block_socket(block_context, libbitcoin::protocol::zmq::socket::role::subscriber),
trx_socket(trx_context, libbitcoin::protocol::zmq::socket::role::subscriber) { trx_socket(trx_context, libbitcoin::protocol::zmq::socket::role::subscriber) {
@ -726,10 +727,19 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
} }
listener = std::unique_ptr<zmq_listener>(new zmq_listener(ips[0], bitcoin_node_zmq_port)); listener = std::unique_ptr<zmq_listener>(new zmq_listener(ips[0], bitcoin_node_zmq_port));
listener->block_event_received.connect([this](const block_data &block_event_data) {
std::thread(&sidechain_net_handler_bitcoin::block_handle_event, this, block_event_data).detach();
});
listener->trx_event_received.connect([this](const libbitcoin::chain::transaction &trx_event_data) {
std::thread(&sidechain_net_handler_bitcoin::trx_handle_event, this, trx_event_data).detach();
});
listener->start();
} else { } else {
bitcoin_client = std::unique_ptr<bitcoin_libbitcoin_client>(new bitcoin_libbitcoin_client(libbitcoin_server_ip)); bitcoin_client = std::unique_ptr<bitcoin_libbitcoin_client>(new bitcoin_libbitcoin_client(libbitcoin_server_ip));
stop_sub_thr = false;
listener = std::unique_ptr<zmq_listener_libbitcoin>(new zmq_listener_libbitcoin(libbitcoin_server_ip, libbitcoin_block_zmq_port, libbitcoin_trx_zmq_port)); subscribe_thr = std::thread(&sidechain_net_handler_bitcoin::subscribe_address_thread, this);
} }
std::string chain_info = bitcoin_client->getblockchaininfo(); std::string chain_info = bitcoin_client->getblockchaininfo();
@ -745,16 +755,6 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
bitcoin_client->getnetworkinfo(); bitcoin_client->getnetworkinfo();
listener->block_event_received.connect([this](const block_data &block_event_data) {
std::thread(&sidechain_net_handler_bitcoin::block_handle_event, this, block_event_data).detach();
});
listener->trx_event_received.connect([this](const libbitcoin::chain::transaction &trx_event_data) {
std::thread(&sidechain_net_handler_bitcoin::trx_handle_event, this, trx_event_data).detach();
});
listener->start();
database.changed_objects.connect([this](const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts) { database.changed_objects.connect([this](const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts) {
on_changed_objects(ids, accounts); on_changed_objects(ids, accounts);
}); });
@ -765,6 +765,11 @@ sidechain_net_handler_bitcoin::~sidechain_net_handler_bitcoin() {
if (on_changed_objects_task.valid()) { if (on_changed_objects_task.valid()) {
on_changed_objects_task.cancel_and_wait(__FUNCTION__); on_changed_objects_task.cancel_and_wait(__FUNCTION__);
} }
if (subscribe_thr.joinable()) {
stop_sub_thr = true;
subscribe_thr.join();
}
} catch (fc::canceled_exception &) { } catch (fc::canceled_exception &) {
// Expected exception. Move along. // Expected exception. Move along.
} catch (fc::exception &e) { } catch (fc::exception &e) {
@ -1655,15 +1660,84 @@ std::string sidechain_net_handler_bitcoin::send_transaction(const sidechain_tran
return tx.get_txid().str(); return tx.get_txid().str();
} }
void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) { void sidechain_net_handler_bitcoin::subscribe_address_thread() {
while (!stop_sub_thr) {
auto vins = bitcoin_client->getblock(event_data); const auto &sidechain_addresses_idx = database.get_index_type<sidechain_address_index>();
const auto &sidechain_addresses_by_sidechain_idx = sidechain_addresses_idx.indices().get<by_sidechain>();
add_to_son_listener_log("BLOCK : " + event_data.block_hash); const auto &sidechain_addresses_by_sidechain_range = sidechain_addresses_by_sidechain_idx.equal_range(sidechain);
std::for_each(sidechain_addresses_by_sidechain_range.first, sidechain_addresses_by_sidechain_range.second,
[&](const sidechain_address_object &sao) {
scoped_lock interlock(event_handler_mutex); scoped_lock interlock(event_handler_mutex);
const auto &sidechain_addresses_idx = database.get_index_type<sidechain_address_index>().indices().get<by_sidechain_and_deposit_address_and_expires>(); if (!sao.deposit_address.empty() && (!libbitcoin_clients[sao.deposit_address].first || !libbitcoin_clients[sao.deposit_address].second)) {
if (sao.expires > database.head_block_time()) {
if (!libbitcoin_clients[sao.deposit_address].first) {
libbitcoin_clients[sao.deposit_address] = std::make_pair(std::unique_ptr<libbitcoin_client>(new libbitcoin_client(libbitcoin_server_ip)), true);
}
auto trx_event_callback = std::bind(&sidechain_net_handler_bitcoin::trx_event, this, std::placeholders::_1, std::placeholders::_2);
auto sub_expired_callback = std::bind(&sidechain_net_handler_bitcoin::subscription_expired_event, this, std::placeholders::_1);
libbitcoin_clients[sao.deposit_address].first->subscribe_to_address(sao.deposit_address, trx_event_callback, sub_expired_callback);
libbitcoin_clients[sao.deposit_address].second = true;
}
}
});
std::this_thread::sleep_for(std::chrono::milliseconds(SUBSCRIPTION_THREAD_INTERVAL));
}
ilog("Exit from subsription thread ....");
}
void sidechain_net_handler_bitcoin::subscription_expired_event(const std::string &address) {
scoped_lock interlock(event_handler_mutex);
if (libbitcoin_clients[address].first && libbitcoin_clients[address].first->is_target_trxs_pool_empty()) {
libbitcoin_clients.erase(address);
libbitcoin_clients[address].first = nullptr;
} else {
libbitcoin_clients[address].second = false;
}
}
void sidechain_net_handler_bitcoin::trx_event(const libbitcoin::chain::transaction &trx, const bool &is_test_net) {
scoped_lock interlock(event_handler_mutex);
std::vector<info_for_vin> result;
uint32_t vout_seq = 0;
for (const auto &o : trx.outputs()) {
std::vector<std::string> address_list;
libbitcoin::wallet::payment_address::list addresses;
if (is_test_net) {
addresses = o.addresses(libbitcoin::wallet::payment_address::testnet_p2kh,
libbitcoin::wallet::payment_address::testnet_p2sh);
} else {
addresses = o.addresses();
}
for (auto &payment_address : addresses) {
std::stringstream ss;
ss << payment_address;
address_list.emplace_back(ss.str());
}
// addres list consists usual of one element
for (auto &address : address_list) {
const auto address_base58 = address;
info_for_vin vin;
vin.out.hash_tx = libbitcoin::config::hash256(trx.hash()).to_string();
vin.out.amount = std::floor(o.value());
vin.out.n_vout = vout_seq;
vin.address = address_base58;
result.push_back(vin);
}
vout_seq++;
}
extract_deposit(result);
}
void sidechain_net_handler_bitcoin::extract_deposit(const std::vector<info_for_vin> &vins) {
const auto &sidechain_addresses_idx = database.get_index_type<sidechain_address_index>().indices().get<by_sidechain_and_deposit_address_and_expires>();
for (const auto &v : vins) { for (const auto &v : vins) {
// !!! EXTRACT DEPOSIT ADDRESS FROM SIDECHAIN ADDRESS OBJECT // !!! EXTRACT DEPOSIT ADDRESS FROM SIDECHAIN ADDRESS OBJECT
const auto &addr_itr = sidechain_addresses_idx.find(std::make_tuple(sidechain, v.address, time_point_sec::maximum())); const auto &addr_itr = sidechain_addresses_idx.find(std::make_tuple(sidechain, v.address, time_point_sec::maximum()));
@ -1697,6 +1771,13 @@ void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_d
} }
} }
void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) {
auto vins = bitcoin_client->getblock(event_data);
add_to_son_listener_log("BLOCK : " + event_data.block_hash);
scoped_lock interlock(event_handler_mutex);
extract_deposit(vins);
}
void sidechain_net_handler_bitcoin::trx_handle_event(const libbitcoin::chain::transaction &trx_data) { void sidechain_net_handler_bitcoin::trx_handle_event(const libbitcoin::chain::transaction &trx_data) {
bitcoin_client->import_trx_to_memory_pool(trx_data); bitcoin_client->import_trx_to_memory_pool(trx_data);
} }