bug fix 388: add ZMQ_RCVTIMEO, graceful thread shutdown

This commit is contained in:
Pavel Baykov 2022-06-13 17:03:20 -03:00
parent 29189b3897
commit 9dd0747e5d
2 changed files with 46 additions and 18 deletions

View file

@ -2,6 +2,7 @@
#include <graphene/peerplays_sidechain/sidechain_net_handler.hpp>
#include <thread>
#include <string>
#include <zmq_addon.hpp>
@ -90,7 +91,9 @@ private:
class zmq_listener {
public:
zmq_listener(std::string _ip, uint32_t _zmq);
virtual ~zmq_listener();
void start();
boost::signals2::signal<void(const std::string &)> event_received;
private:
@ -102,6 +105,9 @@ private:
zmq::context_t ctx;
zmq::socket_t socket;
std::atomic_bool stopped;
std::thread thr;
};
// =============================================================================

View file

@ -1,7 +1,6 @@
#include <graphene/peerplays_sidechain/sidechain_net_handler_bitcoin.hpp>
#include <algorithm>
#include <thread>
#include <boost/algorithm/hex.hpp>
#include <boost/property_tree/json_parser.hpp>
@ -1060,8 +1059,31 @@ zmq_listener::zmq_listener(std::string _ip, uint32_t _zmq) :
ip(_ip),
zmq_port(_zmq),
ctx(1),
socket(ctx, ZMQ_SUB) {
std::thread(&zmq_listener::handle_zmq, this).detach();
socket(ctx, ZMQ_SUB),
stopped(false) {
}
void zmq_listener::start() {
int linger = 0;
auto rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "hashblock", 9);
FC_ASSERT(0 == rc);
rc = zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
FC_ASSERT(0 == rc);
int timeout = 100; //millisec
rc = zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
//socket.setsockopt( ZMQ_SUBSCRIBE, "hashtx", 6 );
//socket.setsockopt( ZMQ_SUBSCRIBE, "rawblock", 8 );
//socket.setsockopt( ZMQ_SUBSCRIBE, "rawtx", 5 );
socket.connect("tcp://" + ip + ":" + std::to_string(zmq_port));
thr = std::thread(&zmq_listener::handle_zmq, this);
ilog("zmq_listener thread started");
}
zmq_listener::~zmq_listener() {
stopped = true;
thr.join();
}
std::vector<zmq::message_t> zmq_listener::receive_multipart() {
@ -1078,26 +1100,25 @@ std::vector<zmq::message_t> zmq_listener::receive_multipart() {
}
void zmq_listener::handle_zmq() {
int linger = 0;
auto rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "hashblock", 9);
FC_ASSERT(0 == rc);
rc = zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
FC_ASSERT(0 == rc);
//socket.setsockopt( ZMQ_SUBSCRIBE, "hashtx", 6 );
//socket.setsockopt( ZMQ_SUBSCRIBE, "rawblock", 8 );
//socket.setsockopt( ZMQ_SUBSCRIBE, "rawtx", 5 );
socket.connect("tcp://" + ip + ":" + std::to_string(zmq_port));
while (true) {
while (false == stopped) {
try {
auto msg = receive_multipart();
const auto header = std::string(static_cast<char *>(msg[0].data()), msg[0].size());
const auto block_hash = boost::algorithm::hex(std::string(static_cast<char *>(msg[1].data()), msg[1].size()));
event_received(block_hash);
std::vector<zmq::message_t> msg;
auto res = zmq::recv_multipart(socket, std::back_inserter(msg));
if (res.has_value()){
if (3 != *res) {
elog("zmq::recv_multipart returned: ${res}", ("res", *res));
throw zmq::error_t();
}
const auto header = std::string(static_cast<char *>(msg[0].data()), msg[0].size());
const auto block_hash = boost::algorithm::hex(std::string(static_cast<char *>(msg[1].data()), msg[1].size()));
event_received(block_hash);
}
} catch (zmq::error_t &e) {
elog("handle_zmq recv_multipart exception ${str}", ("str", e.what()));
}
}
ilog("zmq_listener thread finished");
}
// =============================================================================
@ -1173,6 +1194,7 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
ilog("Bitcoin major version is: '${version}'", ("version", bitcoin_major_version));
listener = std::unique_ptr<zmq_listener>(new zmq_listener(ip, zmq_port));
listener->start();
listener->event_received.connect([this](const std::string &event_data) {
std::thread(&sidechain_net_handler_bitcoin::handle_event, this, event_data).detach();
});