Fixes for most of valgrind errors related to unintialized values
Including one additional debug code (related to read_loop MT access).
This commit is contained in:
parent
a9de7c1da3
commit
b5c8cd0ad3
4 changed files with 64 additions and 32 deletions
|
|
@ -39,8 +39,8 @@ namespace graphene { namespace net {
|
|||
*/
|
||||
struct message_header
|
||||
{
|
||||
uint32_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE
|
||||
uint32_t msg_type; // every channel gets a 16 bit message type specifier
|
||||
uint32_t size = 0; // number of bytes in message, capped at MAX_MESSAGE_SIZE
|
||||
uint32_t msg_type = 0; // every channel gets a 16 bit message type specifier
|
||||
};
|
||||
|
||||
typedef fc::uint160_t message_hash_type;
|
||||
|
|
|
|||
|
|
@ -166,25 +166,25 @@ namespace graphene { namespace net
|
|||
};
|
||||
|
||||
|
||||
size_t _total_queued_messages_size;
|
||||
size_t _total_queued_messages_size = 0;
|
||||
std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
|
||||
fc::future<void> _send_queued_messages_done;
|
||||
public:
|
||||
fc::time_point connection_initiation_time;
|
||||
fc::time_point connection_closed_time;
|
||||
fc::time_point connection_terminated_time;
|
||||
peer_connection_direction direction;
|
||||
peer_connection_direction direction = peer_connection_direction::unknown;
|
||||
//connection_state state;
|
||||
firewalled_state is_firewalled;
|
||||
firewalled_state is_firewalled = firewalled_state::unknown;
|
||||
fc::microseconds clock_offset;
|
||||
fc::microseconds round_trip_delay;
|
||||
|
||||
our_connection_state our_state;
|
||||
bool they_have_requested_close;
|
||||
their_connection_state their_state;
|
||||
bool we_have_requested_close;
|
||||
our_connection_state our_state = our_connection_state::disconnected;
|
||||
bool they_have_requested_close = false;
|
||||
their_connection_state their_state = their_connection_state::disconnected;
|
||||
bool we_have_requested_close = false;
|
||||
|
||||
connection_negotiation_status negotiation_status;
|
||||
connection_negotiation_status negotiation_status = connection_negotiation_status::disconnected;
|
||||
fc::oexception connection_closed_error;
|
||||
|
||||
fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); }
|
||||
|
|
@ -199,7 +199,7 @@ namespace graphene { namespace net
|
|||
* from the user_data field of the hello, or if none is present it will be filled with a
|
||||
* copy of node_public_key */
|
||||
node_id_t node_id;
|
||||
uint32_t core_protocol_version;
|
||||
uint32_t core_protocol_version = 0;
|
||||
std::string user_agent;
|
||||
fc::optional<std::string> graphene_git_revision_sha;
|
||||
fc::optional<fc::time_point_sec> graphene_git_revision_unix_timestamp;
|
||||
|
|
@ -212,8 +212,8 @@ namespace graphene { namespace net
|
|||
// its hello message. For outbound, they record what we sent the peer
|
||||
// in our hello message
|
||||
fc::ip::address inbound_address;
|
||||
uint16_t inbound_port;
|
||||
uint16_t outbound_port;
|
||||
uint16_t inbound_port = 0;
|
||||
uint16_t outbound_port = 0;
|
||||
/// @}
|
||||
|
||||
typedef std::unordered_map<item_id, fc::time_point> item_to_time_map_type;
|
||||
|
|
@ -222,14 +222,14 @@ namespace graphene { namespace net
|
|||
/// @{
|
||||
boost::container::deque<item_hash_t> ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about
|
||||
std::set<item_hash_t> ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing
|
||||
uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
|
||||
bool peer_needs_sync_items_from_us;
|
||||
bool we_need_sync_items_from_peer;
|
||||
uint32_t number_of_unfetched_item_ids = 0; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
|
||||
bool peer_needs_sync_items_from_us = false;
|
||||
bool we_need_sync_items_from_peer = false;
|
||||
fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
|
||||
item_to_time_map_type sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
|
||||
item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows
|
||||
fc::time_point_sec last_block_time_delegate_has_seen;
|
||||
bool inhibit_fetching_sync_blocks;
|
||||
bool inhibit_fetching_sync_blocks = false;
|
||||
/// @}
|
||||
|
||||
/// non-synchronization state data
|
||||
|
|
@ -259,17 +259,17 @@ namespace graphene { namespace net
|
|||
// blockchain catch up
|
||||
fc::time_point transaction_fetching_inhibited_until;
|
||||
|
||||
uint32_t last_known_fork_block_number;
|
||||
uint32_t last_known_fork_block_number = 0;
|
||||
|
||||
fc::future<void> accept_or_connect_task_done;
|
||||
|
||||
firewall_check_state_data *firewall_check_state;
|
||||
firewall_check_state_data *firewall_check_state = nullptr;
|
||||
#ifndef NDEBUG
|
||||
private:
|
||||
fc::thread* _thread;
|
||||
unsigned _send_message_queue_tasks_running; // temporary debugging
|
||||
fc::thread* _thread = nullptr;
|
||||
unsigned _send_message_queue_tasks_running = 0; // temporary debugging
|
||||
#endif
|
||||
bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system
|
||||
bool _currently_handling_message = false; // true while we're in the middle of handling a message from the remote system
|
||||
private:
|
||||
peer_connection(peer_connection_delegate* delegate);
|
||||
void destroy();
|
||||
|
|
|
|||
|
|
@ -32,6 +32,8 @@
|
|||
#include <graphene/net/stcp_socket.hpp>
|
||||
#include <graphene/net/config.hpp>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#ifdef DEFAULT_LOGGER
|
||||
# undef DEFAULT_LOGGER
|
||||
#endif
|
||||
|
|
@ -61,7 +63,7 @@ namespace graphene { namespace net {
|
|||
fc::time_point _last_message_sent_time;
|
||||
|
||||
bool _send_message_in_progress;
|
||||
|
||||
std::atomic_bool readLoopInProgress;
|
||||
#ifndef NDEBUG
|
||||
fc::thread* _thread;
|
||||
#endif
|
||||
|
|
@ -97,7 +99,8 @@ namespace graphene { namespace net {
|
|||
_delegate(delegate),
|
||||
_bytes_received(0),
|
||||
_bytes_sent(0),
|
||||
_send_message_in_progress(false)
|
||||
_send_message_in_progress(false),
|
||||
readLoopInProgress(false)
|
||||
#ifndef NDEBUG
|
||||
,_thread(&fc::thread::current())
|
||||
#endif
|
||||
|
|
@ -137,6 +140,20 @@ namespace graphene { namespace net {
|
|||
_sock.bind(local_endpoint);
|
||||
}
|
||||
|
||||
class THelper final
|
||||
{
|
||||
std::atomic_bool* Flag;
|
||||
public:
|
||||
explicit THelper(std::atomic_bool* flag) : Flag(flag)
|
||||
{
|
||||
FC_ASSERT(*flag == false, "Only one thread at time can visit it");
|
||||
*flag = true;
|
||||
}
|
||||
~THelper()
|
||||
{
|
||||
*Flag = false;
|
||||
}
|
||||
};
|
||||
|
||||
void message_oriented_connection_impl::read_loop()
|
||||
{
|
||||
|
|
@ -145,6 +162,8 @@ namespace graphene { namespace net {
|
|||
const int LEFTOVER = BUFFER_SIZE - sizeof(message_header);
|
||||
static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer");
|
||||
|
||||
THelper helper(&this->readLoopInProgress);
|
||||
|
||||
_connected_time = fc::time_point::now();
|
||||
|
||||
fc::oexception exception_to_rethrow;
|
||||
|
|
@ -261,8 +280,13 @@ namespace graphene { namespace net {
|
|||
//pad the message we send to a multiple of 16 bytes
|
||||
size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
|
||||
std::unique_ptr<char[]> padded_message(new char[size_with_padding]);
|
||||
|
||||
memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header));
|
||||
memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size );
|
||||
char* paddingSpace = padded_message.get() + sizeof(message_header) + message_to_send.size;
|
||||
size_t toClean = size_with_padding - size_of_message_and_header;
|
||||
memset(paddingSpace, 0, toClean);
|
||||
|
||||
_sock.write(padded_message.get(), size_with_padding);
|
||||
_sock.flush();
|
||||
_bytes_sent += size_with_padding;
|
||||
|
|
|
|||
|
|
@ -5420,17 +5420,21 @@ namespace graphene { namespace net { namespace detail {
|
|||
# define INVOKE_AND_COLLECT_STATISTICS(method_name, ...) \
|
||||
try \
|
||||
{ \
|
||||
call_statistics_collector statistics_collector(#method_name, \
|
||||
&_ ## method_name ## _execution_accumulator, \
|
||||
&_ ## method_name ## _delay_before_accumulator, \
|
||||
&_ ## method_name ## _delay_after_accumulator); \
|
||||
if (_thread->is_current()) \
|
||||
{ \
|
||||
call_statistics_collector statistics_collector(#method_name, \
|
||||
&_ ## method_name ## _execution_accumulator, \
|
||||
&_ ## method_name ## _delay_before_accumulator, \
|
||||
&_ ## method_name ## _delay_after_accumulator); \
|
||||
call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
|
||||
return _node_delegate->method_name(__VA_ARGS__); \
|
||||
} \
|
||||
else \
|
||||
return _thread->async([&](){ \
|
||||
call_statistics_collector statistics_collector(#method_name, \
|
||||
&_ ## method_name ## _execution_accumulator, \
|
||||
&_ ## method_name ## _delay_before_accumulator, \
|
||||
&_ ## method_name ## _delay_after_accumulator); \
|
||||
call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
|
||||
return _node_delegate->method_name(__VA_ARGS__); \
|
||||
}, "invoke " BOOST_STRINGIZE(method_name)).wait(); \
|
||||
|
|
@ -5452,17 +5456,21 @@ namespace graphene { namespace net { namespace detail {
|
|||
}
|
||||
#else
|
||||
# define INVOKE_AND_COLLECT_STATISTICS(method_name, ...) \
|
||||
call_statistics_collector statistics_collector(#method_name, \
|
||||
&_ ## method_name ## _execution_accumulator, \
|
||||
&_ ## method_name ## _delay_before_accumulator, \
|
||||
&_ ## method_name ## _delay_after_accumulator); \
|
||||
if (_thread->is_current()) \
|
||||
{ \
|
||||
call_statistics_collector statistics_collector(#method_name, \
|
||||
&_ ## method_name ## _execution_accumulator, \
|
||||
&_ ## method_name ## _delay_before_accumulator, \
|
||||
&_ ## method_name ## _delay_after_accumulator); \
|
||||
call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
|
||||
return _node_delegate->method_name(__VA_ARGS__); \
|
||||
} \
|
||||
else \
|
||||
return _thread->async([&](){ \
|
||||
call_statistics_collector statistics_collector(#method_name, \
|
||||
&_ ## method_name ## _execution_accumulator, \
|
||||
&_ ## method_name ## _delay_before_accumulator, \
|
||||
&_ ## method_name ## _delay_after_accumulator); \
|
||||
call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
|
||||
return _node_delegate->method_name(__VA_ARGS__); \
|
||||
}, "invoke " BOOST_STRINGIZE(method_name)).wait()
|
||||
|
|
|
|||
Loading…
Reference in a new issue