DO NOT MERGE Adding logs for receiving blocks #805

Closed
hirunda wants to merge 1 commit from local_dev_manual_zmq_socet into develop
2 changed files with 37 additions and 7 deletions

View file

@ -195,6 +195,9 @@ private:
libbitcoin::protocol::zmq::context trx_context; libbitcoin::protocol::zmq::context trx_context;
libbitcoin::protocol::zmq::socket trx_socket; libbitcoin::protocol::zmq::socket trx_socket;
libbitcoin::protocol::zmq::poller trx_poller; libbitcoin::protocol::zmq::poller trx_poller;
zmq::context_t ctx;
zmq::socket_t socket;
}; };
// ============================================================================= // =============================================================================

View file

@ -515,7 +515,7 @@ zmq_listener::zmq_listener(std::string _ip, uint32_t _zmq_block_port, uint32_t _
void zmq_listener::start() { void zmq_listener::start() {
int linger = 0; int linger = 0;
auto rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "hashblock", 9); auto rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
FC_ASSERT(0 == rc); FC_ASSERT(0 == rc);
rc = zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)); rc = zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
FC_ASSERT(0 == rc); FC_ASSERT(0 == rc);
@ -524,7 +524,7 @@ void zmq_listener::start() {
// socket.setsockopt( ZMQ_SUBSCRIBE, "hashtx", 6 ); // socket.setsockopt( ZMQ_SUBSCRIBE, "hashtx", 6 );
// socket.setsockopt( ZMQ_SUBSCRIBE, "rawblock", 8 ); // socket.setsockopt( ZMQ_SUBSCRIBE, "rawblock", 8 );
// socket.setsockopt( ZMQ_SUBSCRIBE, "rawtx", 5 ); // socket.setsockopt( ZMQ_SUBSCRIBE, "rawtx", 5 );
socket.connect("tcp://" + ip + ":" + std::to_string(block_zmq_port)); socket.connect("tcp://" + ip + ":" + std::to_string(9093));
block_thr = std::thread(&zmq_listener::handle_zmq, this); block_thr = std::thread(&zmq_listener::handle_zmq, this);
@ -555,12 +555,15 @@ void zmq_listener::handle_zmq() {
std::vector<zmq::message_t> msg; std::vector<zmq::message_t> msg;
auto res = zmq::recv_multipart(socket, std::back_inserter(msg)); auto res = zmq::recv_multipart(socket, std::back_inserter(msg));
if (res.has_value()) { if (res.has_value()) {
wlog("=====>>>> HERE IS RECEIVED ******************");
if (3 != *res) { if (3 != *res) {
elog("zmq::recv_multipart returned: ${res}", ("res", *res)); elog("zmq::recv_multipart returned: ${res}", ("res", *res));
throw zmq::error_t(); throw zmq::error_t();
} }
const auto header = std::string(static_cast<char *>(msg[0].data()), msg[0].size()); const auto header = std::string(static_cast<char *>(msg[0].data()), msg[0].size());
const auto block_hash = boost::algorithm::hex(std::string(static_cast<char *>(msg[1].data()), msg[1].size())); const auto block_hash = boost::algorithm::hex(std::string(static_cast<char *>(msg[1].data()), msg[1].size()));
wlog("The size received is: ${size1}, ${size2}, ${size3}", ("size1", msg[0].size()) ("size2" , msg[1].size()) ("size3", msg[2].size()));
block_data event_data; block_data event_data;
event_data.block_hash = block_hash; event_data.block_hash = block_hash;
block_event_received(event_data); block_event_received(event_data);
@ -579,6 +582,8 @@ void zmq_listener::handle_zmq() {
zmq_listener_libbitcoin::zmq_listener_libbitcoin(std::string _ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port) : zmq_listener_libbitcoin::zmq_listener_libbitcoin(std::string _ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port) :
zmq_listener_base(_ip, _block_zmq_port, _trx_zmq_port), zmq_listener_base(_ip, _block_zmq_port, _trx_zmq_port),
ctx(1),
socket(ctx, ZMQ_SUB),
block_socket(block_context, libbitcoin::protocol::zmq::socket::role::subscriber), block_socket(block_context, libbitcoin::protocol::zmq::socket::role::subscriber),
trx_socket(trx_context, libbitcoin::protocol::zmq::socket::role::subscriber) { trx_socket(trx_context, libbitcoin::protocol::zmq::socket::role::subscriber) {
} }
@ -595,7 +600,7 @@ void zmq_listener_libbitcoin::start() {
libbitcoin::config::endpoint block_address(endpoint_address, block_zmq_port); libbitcoin::config::endpoint block_address(endpoint_address, block_zmq_port);
libbitcoin::config::endpoint trx_address(endpoint_address, trx_zmq_port); libbitcoin::config::endpoint trx_address(endpoint_address, trx_zmq_port);
block_socket.connect(block_address); // block_socket.connect(block_address);
trx_socket.connect(trx_address); trx_socket.connect(trx_address);
block_thr = std::thread(&zmq_listener_libbitcoin::handle_block, this); block_thr = std::thread(&zmq_listener_libbitcoin::handle_block, this);
@ -630,15 +635,32 @@ void zmq_listener_libbitcoin::handle_trx() {
} }
void zmq_listener_libbitcoin::handle_block() { void zmq_listener_libbitcoin::handle_block() {
block_poller.add(block_socket);
int linger = 0;
auto rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
FC_ASSERT(0 == rc);
rc = zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
FC_ASSERT(0 == rc);
int timeout = 100; // millisec
rc = zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
// socket.setsockopt( ZMQ_SUBSCRIBE, "hashtx", 6 );
// socket.setsockopt( ZMQ_SUBSCRIBE, "rawblock", 8 );
// socket.setsockopt( ZMQ_SUBSCRIBE, "rawtx", 5 );
socket.connect("tcp://" + ip + ":" + std::to_string(block_zmq_port));
libbitcoin::protocol::zmq::socket test_block_socket(&socket);
block_poller.add(test_block_socket);
while (!stopped.load()) { while (!stopped.load()) {
const auto identifiers = block_poller.wait(500); const auto identifiers = block_poller.wait(500);
if (identifiers.contains(block_socket.id())) { if (identifiers.contains(test_block_socket.id())) {
libbitcoin::protocol::zmq::message message; libbitcoin::protocol::zmq::message message;
block_socket.receive(message); wlog("BLOCK IS RCCCCCCCC");
test_block_socket.receive(message);
std::vector<uint8_t> data; std::vector<uint8_t> data;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
@ -729,7 +751,8 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
} else { } else {
bitcoin_client = std::unique_ptr<bitcoin_libbitcoin_client>(new bitcoin_libbitcoin_client(libbitcoin_server_ip)); bitcoin_client = std::unique_ptr<bitcoin_libbitcoin_client>(new bitcoin_libbitcoin_client(libbitcoin_server_ip));
listener = std::unique_ptr<zmq_listener_libbitcoin>(new zmq_listener_libbitcoin(libbitcoin_server_ip, libbitcoin_block_zmq_port, libbitcoin_trx_zmq_port)); listener = std::unique_ptr<zmq_listener>(new zmq_listener(libbitcoin_server_ip, libbitcoin_block_zmq_port));
// listener = std::unique_ptr<zmq_listener_libbitcoin>(new zmq_listener_libbitcoin(libbitcoin_server_ip, libbitcoin_block_zmq_port, libbitcoin_trx_zmq_port));
} }
std::string chain_info = bitcoin_client->getblockchaininfo(); std::string chain_info = bitcoin_client->getblockchaininfo();
@ -1657,6 +1680,10 @@ std::string sidechain_net_handler_bitcoin::send_transaction(const sidechain_tran
void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) { void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) {
// Only for test
wlog("====> BLOCK IS RECEIVED <=====");
return;
auto vins = bitcoin_client->getblock(event_data); auto vins = bitcoin_client->getblock(event_data);
add_to_son_listener_log("BLOCK : " + event_data.block_hash); add_to_son_listener_log("BLOCK : " + event_data.block_hash);