diff --git a/libraries/net/include/graphene/net/message.hpp b/libraries/net/include/graphene/net/message.hpp index 9cbc0af9..cfef1519 100644 --- a/libraries/net/include/graphene/net/message.hpp +++ b/libraries/net/include/graphene/net/message.hpp @@ -39,8 +39,8 @@ namespace graphene { namespace net { */ struct message_header { - uint32_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE - uint32_t msg_type; // every channel gets a 16 bit message type specifier + uint32_t size = 0; // number of bytes in message, capped at MAX_MESSAGE_SIZE + uint32_t msg_type = 0; // every channel gets a 16 bit message type specifier }; typedef fc::uint160_t message_hash_type; diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index 8c738af1..6f9a4b20 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -166,25 +166,25 @@ namespace graphene { namespace net }; - size_t _total_queued_messages_size; + size_t _total_queued_messages_size = 0; std::queue, std::list > > _queued_messages; fc::future _send_queued_messages_done; public: fc::time_point connection_initiation_time; fc::time_point connection_closed_time; fc::time_point connection_terminated_time; - peer_connection_direction direction; + peer_connection_direction direction = peer_connection_direction::unknown; //connection_state state; - firewalled_state is_firewalled; + firewalled_state is_firewalled = firewalled_state::unknown; fc::microseconds clock_offset; fc::microseconds round_trip_delay; - our_connection_state our_state; - bool they_have_requested_close; - their_connection_state their_state; - bool we_have_requested_close; + our_connection_state our_state = our_connection_state::disconnected; + bool they_have_requested_close = false; + their_connection_state their_state = their_connection_state::disconnected; + bool we_have_requested_close = false; - connection_negotiation_status negotiation_status; + connection_negotiation_status negotiation_status = connection_negotiation_status::disconnected; fc::oexception connection_closed_error; fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); } @@ -199,7 +199,7 @@ namespace graphene { namespace net * from the user_data field of the hello, or if none is present it will be filled with a * copy of node_public_key */ node_id_t node_id; - uint32_t core_protocol_version; + uint32_t core_protocol_version = 0; std::string user_agent; fc::optional graphene_git_revision_sha; fc::optional graphene_git_revision_unix_timestamp; @@ -212,8 +212,8 @@ namespace graphene { namespace net // its hello message. For outbound, they record what we sent the peer // in our hello message fc::ip::address inbound_address; - uint16_t inbound_port; - uint16_t outbound_port; + uint16_t inbound_port = 0; + uint16_t outbound_port = 0; /// @} typedef std::unordered_map item_to_time_map_type; @@ -222,14 +222,14 @@ namespace graphene { namespace net /// @{ boost::container::deque ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about std::set ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing - uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids - bool peer_needs_sync_items_from_us; - bool we_need_sync_items_from_peer; + uint32_t number_of_unfetched_item_ids = 0; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids + bool peer_needs_sync_items_from_us = false; + bool we_need_sync_items_from_peer = false; fc::optional, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy() item_to_time_map_type sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows fc::time_point_sec last_block_time_delegate_has_seen; - bool inhibit_fetching_sync_blocks; + bool inhibit_fetching_sync_blocks = false; /// @} /// non-synchronization state data @@ -259,17 +259,17 @@ namespace graphene { namespace net // blockchain catch up fc::time_point transaction_fetching_inhibited_until; - uint32_t last_known_fork_block_number; + uint32_t last_known_fork_block_number = 0; fc::future accept_or_connect_task_done; - firewall_check_state_data *firewall_check_state; + firewall_check_state_data *firewall_check_state = nullptr; #ifndef NDEBUG private: - fc::thread* _thread; - unsigned _send_message_queue_tasks_running; // temporary debugging + fc::thread* _thread = nullptr; + unsigned _send_message_queue_tasks_running = 0; // temporary debugging #endif - bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system + bool _currently_handling_message = false; // true while we're in the middle of handling a message from the remote system private: peer_connection(peer_connection_delegate* delegate); void destroy(); diff --git a/libraries/net/message_oriented_connection.cpp b/libraries/net/message_oriented_connection.cpp index 5808a038..35c09022 100644 --- a/libraries/net/message_oriented_connection.cpp +++ b/libraries/net/message_oriented_connection.cpp @@ -32,6 +32,8 @@ #include #include +#include + #ifdef DEFAULT_LOGGER # undef DEFAULT_LOGGER #endif @@ -61,7 +63,7 @@ namespace graphene { namespace net { fc::time_point _last_message_sent_time; bool _send_message_in_progress; - + std::atomic_bool readLoopInProgress; #ifndef NDEBUG fc::thread* _thread; #endif @@ -97,7 +99,8 @@ namespace graphene { namespace net { _delegate(delegate), _bytes_received(0), _bytes_sent(0), - _send_message_in_progress(false) + _send_message_in_progress(false), + readLoopInProgress(false) #ifndef NDEBUG ,_thread(&fc::thread::current()) #endif @@ -137,6 +140,20 @@ namespace graphene { namespace net { _sock.bind(local_endpoint); } + class THelper final + { + std::atomic_bool* Flag; + public: + explicit THelper(std::atomic_bool* flag) : Flag(flag) + { + FC_ASSERT(*flag == false, "Only one thread at time can visit it"); + *flag = true; + } + ~THelper() + { + *Flag = false; + } + }; void message_oriented_connection_impl::read_loop() { @@ -145,6 +162,8 @@ namespace graphene { namespace net { const int LEFTOVER = BUFFER_SIZE - sizeof(message_header); static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer"); + THelper helper(&this->readLoopInProgress); + _connected_time = fc::time_point::now(); fc::oexception exception_to_rethrow; @@ -261,8 +280,13 @@ namespace graphene { namespace net { //pad the message we send to a multiple of 16 bytes size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16); std::unique_ptr padded_message(new char[size_with_padding]); + memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header)); memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size ); + char* paddingSpace = padded_message.get() + sizeof(message_header) + message_to_send.size; + size_t toClean = size_with_padding - size_of_message_and_header; + memset(paddingSpace, 0, toClean); + _sock.write(padded_message.get(), size_with_padding); _sock.flush(); _bytes_sent += size_with_padding; diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index a27d7ae7..9d8b9529 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -5420,17 +5420,21 @@ namespace graphene { namespace net { namespace detail { # define INVOKE_AND_COLLECT_STATISTICS(method_name, ...) \ try \ { \ - call_statistics_collector statistics_collector(#method_name, \ - &_ ## method_name ## _execution_accumulator, \ - &_ ## method_name ## _delay_before_accumulator, \ - &_ ## method_name ## _delay_after_accumulator); \ if (_thread->is_current()) \ { \ + call_statistics_collector statistics_collector(#method_name, \ + &_ ## method_name ## _execution_accumulator, \ + &_ ## method_name ## _delay_before_accumulator, \ + &_ ## method_name ## _delay_after_accumulator); \ call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \ return _node_delegate->method_name(__VA_ARGS__); \ } \ else \ return _thread->async([&](){ \ + call_statistics_collector statistics_collector(#method_name, \ + &_ ## method_name ## _execution_accumulator, \ + &_ ## method_name ## _delay_before_accumulator, \ + &_ ## method_name ## _delay_after_accumulator); \ call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \ return _node_delegate->method_name(__VA_ARGS__); \ }, "invoke " BOOST_STRINGIZE(method_name)).wait(); \ @@ -5452,17 +5456,21 @@ namespace graphene { namespace net { namespace detail { } #else # define INVOKE_AND_COLLECT_STATISTICS(method_name, ...) \ - call_statistics_collector statistics_collector(#method_name, \ - &_ ## method_name ## _execution_accumulator, \ - &_ ## method_name ## _delay_before_accumulator, \ - &_ ## method_name ## _delay_after_accumulator); \ if (_thread->is_current()) \ { \ + call_statistics_collector statistics_collector(#method_name, \ + &_ ## method_name ## _execution_accumulator, \ + &_ ## method_name ## _delay_before_accumulator, \ + &_ ## method_name ## _delay_after_accumulator); \ call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \ return _node_delegate->method_name(__VA_ARGS__); \ } \ else \ return _thread->async([&](){ \ + call_statistics_collector statistics_collector(#method_name, \ + &_ ## method_name ## _execution_accumulator, \ + &_ ## method_name ## _delay_before_accumulator, \ + &_ ## method_name ## _delay_after_accumulator); \ call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \ return _node_delegate->method_name(__VA_ARGS__); \ }, "invoke " BOOST_STRINGIZE(method_name)).wait()