Draft: Local test block events #804

Open
hirunda wants to merge 2 commits from local_test_block_events into develop
2 changed files with 30 additions and 6 deletions

View file

@ -195,6 +195,7 @@ private:
libbitcoin::protocol::zmq::context trx_context;
libbitcoin::protocol::zmq::socket trx_socket;
libbitcoin::protocol::zmq::poller trx_poller;
libbitcoin::protocol::zmq::poller common_poller;
};
// =============================================================================

View file

@ -359,6 +359,8 @@ std::vector<info_for_vin> bitcoin_libbitcoin_client::getblock(const block_data &
std::unique_lock<std::mutex> lck(libbitcoin_event_mutex);
wlog("RECEIVED BLOCK FROM LIBBITCOIN : ${block_hash}", ("block_hash" , block.block_hash));
// estimate fee
const auto &block_trxs = block.block.transactions();
std::vector<libbitcoin::chain::transaction> bucket_trxs;
@ -585,6 +587,8 @@ zmq_listener_libbitcoin::zmq_listener_libbitcoin(std::string _ip, uint32_t _bloc
zmq_listener_libbitcoin::~zmq_listener_libbitcoin() {
stopped.store(true);
trx_socket.stop();
block_socket.stop();
block_thr.join();
trx_thr.join();
}
@ -598,15 +602,18 @@ void zmq_listener_libbitcoin::start() {
block_socket.connect(block_address);
trx_socket.connect(trx_address);
block_thr = std::thread(&zmq_listener_libbitcoin::handle_block, this);
common_poller.add(block_socket);
common_poller.add(trx_socket);
// block_thr = std::thread(&zmq_listener_libbitcoin::handle_block, this);
trx_thr = std::thread(&zmq_listener_libbitcoin::handle_trx, this);
}
void zmq_listener_libbitcoin::handle_trx() {
trx_poller.add(trx_socket);
// trx_poller.add(trx_socket);
while (!stopped.load()) {
const auto identifiers = trx_poller.wait(500);
const auto identifiers = common_poller.wait(500);
if (identifiers.contains(trx_socket.id())) {
libbitcoin::protocol::zmq::message message;
@ -623,6 +630,23 @@ void zmq_listener_libbitcoin::handle_trx() {
trx.from_data(data, true);
trx_event_received(trx);
} else if (identifiers.contains(block_socket.id())) {
libbitcoin::protocol::zmq::message message;
block_socket.receive(message);
std::vector<uint8_t> data;
for (int i = 0; i < 3; i++) {
data.clear();
message.dequeue(data);
}
libbitcoin::chain::block block;
block.from_data(data, true);
block_data event_data;
event_data.block_hash = libbitcoin::config::hash256(block.hash()).to_string();
event_data.block = std::move(block);
block_event_received(event_data);
}
}
@ -630,14 +654,13 @@ void zmq_listener_libbitcoin::handle_trx() {
}
void zmq_listener_libbitcoin::handle_block() {
block_poller.add(block_socket);
// block_poller.add(block_socket);
while (!stopped.load()) {
const auto identifiers = block_poller.wait(500);
const auto identifiers = common_poller.wait(500);
if (identifiers.contains(block_socket.id())) {
libbitcoin::protocol::zmq::message message;
block_socket.receive(message);
std::vector<uint8_t> data;