DO NOT MERGE Adding logs for receiving blocks #805
2 changed files with 37 additions and 7 deletions
|
|
@ -195,6 +195,9 @@ private:
|
|||
libbitcoin::protocol::zmq::context trx_context;
|
||||
libbitcoin::protocol::zmq::socket trx_socket;
|
||||
libbitcoin::protocol::zmq::poller trx_poller;
|
||||
|
||||
zmq::context_t ctx;
|
||||
zmq::socket_t socket;
|
||||
};
|
||||
|
||||
// =============================================================================
|
||||
|
|
|
|||
|
|
@ -515,7 +515,7 @@ zmq_listener::zmq_listener(std::string _ip, uint32_t _zmq_block_port, uint32_t _
|
|||
|
||||
void zmq_listener::start() {
|
||||
int linger = 0;
|
||||
auto rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "hashblock", 9);
|
||||
auto rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
|
||||
FC_ASSERT(0 == rc);
|
||||
rc = zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
|
||||
FC_ASSERT(0 == rc);
|
||||
|
|
@ -524,7 +524,7 @@ void zmq_listener::start() {
|
|||
// socket.setsockopt( ZMQ_SUBSCRIBE, "hashtx", 6 );
|
||||
// socket.setsockopt( ZMQ_SUBSCRIBE, "rawblock", 8 );
|
||||
// socket.setsockopt( ZMQ_SUBSCRIBE, "rawtx", 5 );
|
||||
socket.connect("tcp://" + ip + ":" + std::to_string(block_zmq_port));
|
||||
socket.connect("tcp://" + ip + ":" + std::to_string(9093));
|
||||
|
||||
block_thr = std::thread(&zmq_listener::handle_zmq, this);
|
||||
|
||||
|
|
@ -555,12 +555,15 @@ void zmq_listener::handle_zmq() {
|
|||
std::vector<zmq::message_t> msg;
|
||||
auto res = zmq::recv_multipart(socket, std::back_inserter(msg));
|
||||
if (res.has_value()) {
|
||||
wlog("=====>>>> HERE IS RECEIVED ******************");
|
||||
if (3 != *res) {
|
||||
elog("zmq::recv_multipart returned: ${res}", ("res", *res));
|
||||
throw zmq::error_t();
|
||||
}
|
||||
const auto header = std::string(static_cast<char *>(msg[0].data()), msg[0].size());
|
||||
const auto block_hash = boost::algorithm::hex(std::string(static_cast<char *>(msg[1].data()), msg[1].size()));
|
||||
|
||||
wlog("The size received is: ${size1}, ${size2}, ${size3}", ("size1", msg[0].size()) ("size2" , msg[1].size()) ("size3", msg[2].size()));
|
||||
block_data event_data;
|
||||
event_data.block_hash = block_hash;
|
||||
block_event_received(event_data);
|
||||
|
|
@ -579,6 +582,8 @@ void zmq_listener::handle_zmq() {
|
|||
|
||||
zmq_listener_libbitcoin::zmq_listener_libbitcoin(std::string _ip, uint32_t _block_zmq_port, uint32_t _trx_zmq_port) :
|
||||
zmq_listener_base(_ip, _block_zmq_port, _trx_zmq_port),
|
||||
ctx(1),
|
||||
socket(ctx, ZMQ_SUB),
|
||||
block_socket(block_context, libbitcoin::protocol::zmq::socket::role::subscriber),
|
||||
trx_socket(trx_context, libbitcoin::protocol::zmq::socket::role::subscriber) {
|
||||
}
|
||||
|
|
@ -595,7 +600,7 @@ void zmq_listener_libbitcoin::start() {
|
|||
libbitcoin::config::endpoint block_address(endpoint_address, block_zmq_port);
|
||||
libbitcoin::config::endpoint trx_address(endpoint_address, trx_zmq_port);
|
||||
|
||||
block_socket.connect(block_address);
|
||||
// block_socket.connect(block_address);
|
||||
trx_socket.connect(trx_address);
|
||||
|
||||
block_thr = std::thread(&zmq_listener_libbitcoin::handle_block, this);
|
||||
|
|
@ -630,15 +635,32 @@ void zmq_listener_libbitcoin::handle_trx() {
|
|||
}
|
||||
|
||||
void zmq_listener_libbitcoin::handle_block() {
|
||||
block_poller.add(block_socket);
|
||||
|
||||
int linger = 0;
|
||||
auto rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
|
||||
FC_ASSERT(0 == rc);
|
||||
rc = zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
|
||||
FC_ASSERT(0 == rc);
|
||||
int timeout = 100; // millisec
|
||||
rc = zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
|
||||
// socket.setsockopt( ZMQ_SUBSCRIBE, "hashtx", 6 );
|
||||
// socket.setsockopt( ZMQ_SUBSCRIBE, "rawblock", 8 );
|
||||
// socket.setsockopt( ZMQ_SUBSCRIBE, "rawtx", 5 );
|
||||
socket.connect("tcp://" + ip + ":" + std::to_string(block_zmq_port));
|
||||
|
||||
libbitcoin::protocol::zmq::socket test_block_socket(&socket);
|
||||
|
||||
block_poller.add(test_block_socket);
|
||||
|
||||
while (!stopped.load()) {
|
||||
const auto identifiers = block_poller.wait(500);
|
||||
|
||||
if (identifiers.contains(block_socket.id())) {
|
||||
if (identifiers.contains(test_block_socket.id())) {
|
||||
libbitcoin::protocol::zmq::message message;
|
||||
|
||||
block_socket.receive(message);
|
||||
wlog("BLOCK IS RCCCCCCCC");
|
||||
|
||||
test_block_socket.receive(message);
|
||||
|
||||
std::vector<uint8_t> data;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
|
|
@ -729,7 +751,8 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
|
|||
} else {
|
||||
bitcoin_client = std::unique_ptr<bitcoin_libbitcoin_client>(new bitcoin_libbitcoin_client(libbitcoin_server_ip));
|
||||
|
||||
listener = std::unique_ptr<zmq_listener_libbitcoin>(new zmq_listener_libbitcoin(libbitcoin_server_ip, libbitcoin_block_zmq_port, libbitcoin_trx_zmq_port));
|
||||
listener = std::unique_ptr<zmq_listener>(new zmq_listener(libbitcoin_server_ip, libbitcoin_block_zmq_port));
|
||||
// listener = std::unique_ptr<zmq_listener_libbitcoin>(new zmq_listener_libbitcoin(libbitcoin_server_ip, libbitcoin_block_zmq_port, libbitcoin_trx_zmq_port));
|
||||
}
|
||||
|
||||
std::string chain_info = bitcoin_client->getblockchaininfo();
|
||||
|
|
@ -1657,6 +1680,10 @@ std::string sidechain_net_handler_bitcoin::send_transaction(const sidechain_tran
|
|||
|
||||
void sidechain_net_handler_bitcoin::block_handle_event(const block_data &event_data) {
|
||||
|
||||
// Only for test
|
||||
wlog("====> BLOCK IS RECEIVED <=====");
|
||||
return;
|
||||
|
||||
auto vins = bitcoin_client->getblock(event_data);
|
||||
|
||||
add_to_son_listener_log("BLOCK : " + event_data.block_hash);
|
||||
|
|
|
|||
Loading…
Reference in a new issue