diff --git a/libraries/chain/db_block.cpp b/libraries/chain/db_block.cpp index 65f10cc7..d49f1dc5 100644 --- a/libraries/chain/db_block.cpp +++ b/libraries/chain/db_block.cpp @@ -162,10 +162,13 @@ void database::check_transaction_for_duplicated_operations(const signed_transact existed_operations_digests.insert( proposed_operations_digests.begin(), proposed_operations_digests.end() ); }); - for (auto& pending_transaction: _pending_tx) { - auto proposed_operations_digests = gather_proposed_operations_digests(pending_transaction); - existed_operations_digests.insert(proposed_operations_digests.begin(), proposed_operations_digests.end()); + const std::lock_guard pending_tx_lock{_pending_tx_mutex}; + for (auto &pending_transaction : _pending_tx) + { + auto proposed_operations_digests = gather_proposed_operations_digests(pending_transaction); + existed_operations_digests.insert(proposed_operations_digests.begin(), proposed_operations_digests.end()); + } } auto proposed_operations_digests = gather_proposed_operations_digests(trx); @@ -187,7 +190,12 @@ bool database::push_block(const signed_block& new_block, uint32_t skip) bool result; detail::with_skip_flags( *this, skip, [&]() { - detail::without_pending_transactions( *this, std::move(_pending_tx), + std::vector pending_tx = [this] { + const std::lock_guard pending_tx_lock{_pending_tx_mutex}; + return std::move(_pending_tx); + }(); + + detail::without_pending_transactions( *this, std::move(pending_tx), [&]() { result = _push_block(new_block); @@ -387,17 +395,26 @@ processed_transaction database::_push_transaction( const signed_transaction& trx { // If this is the first transaction pushed after applying a block, start a new undo session. // This allows us to quickly rewind to the clean state of the head block, in case a new block arrives. - if( !_pending_tx_session.valid() ) - _pending_tx_session = _undo_db.start_undo_session(); + { + const std::lock_guard pending_tx_session_lock{_pending_tx_session_mutex}; + if (!_pending_tx_session.valid()) { + const std::lock_guard undo_db_lock{_undo_db_mutex}; + _pending_tx_session = _undo_db.start_undo_session(); + } + } // Create a temporary undo session as a child of _pending_tx_session. // The temporary session will be discarded by the destructor if // _apply_transaction fails. If we make it to merge(), we // apply the changes. + const std::lock_guard undo_db_lock{_undo_db_mutex}; auto temp_session = _undo_db.start_undo_session(); - auto processed_trx = _apply_transaction( trx ); - _pending_tx.push_back(processed_trx); + auto processed_trx = _apply_transaction(trx); + { + const std::lock_guard pending_tx_lock{_pending_tx_mutex}; + _pending_tx.push_back(processed_trx); + } // notify_changed_objects(); // The transaction applied successfully. Merge its changes into the pending block session. @@ -410,6 +427,7 @@ processed_transaction database::_push_transaction( const signed_transaction& trx processed_transaction database::validate_transaction( const signed_transaction& trx ) { + const std::lock_guard undo_db_lock{_undo_db_mutex}; auto session = _undo_db.start_undo_session(); return _apply_transaction( trx ); } @@ -509,47 +527,52 @@ signed_block database::_generate_block( // the value of the "when" variable is known, which means we need to // re-apply pending transactions in this method. // - _pending_tx_session.reset(); - _pending_tx_session = _undo_db.start_undo_session(); + { + const std::lock_guard pending_tx_session_lock{_pending_tx_session_mutex}; + _pending_tx_session.reset(); + _pending_tx_session = _undo_db.start_undo_session(); + } uint64_t postponed_tx_count = 0; // pop pending state (reset to head block state) - for( const processed_transaction& tx : _pending_tx ) { - size_t new_total_size = total_block_size + fc::raw::pack_size( tx ); + const std::lock_guard pending_tx_lock{_pending_tx_mutex}; + for (const processed_transaction &tx : _pending_tx) { + size_t new_total_size = total_block_size + fc::raw::pack_size(tx); - // postpone transaction if it would make block too big - if( new_total_size >= maximum_block_size ) - { - postponed_tx_count++; - continue; - } + // postpone transaction if it would make block too big + if (new_total_size >= maximum_block_size) { + postponed_tx_count++; + continue; + } - try - { - auto temp_session = _undo_db.start_undo_session(); - processed_transaction ptx = _apply_transaction( tx ); - temp_session.merge(); + try { + auto temp_session = _undo_db.start_undo_session(); + processed_transaction ptx = _apply_transaction(tx); + temp_session.merge(); - // We have to recompute pack_size(ptx) because it may be different - // than pack_size(tx) (i.e. if one or more results increased - // their size) - total_block_size += fc::raw::pack_size( ptx ); - pending_block.transactions.push_back( ptx ); - } - catch ( const fc::exception& e ) - { - // Do nothing, transaction will not be re-applied - wlog( "Transaction was not processed while generating block due to ${e}", ("e", e) ); - wlog( "The transaction was ${t}", ("t", tx) ); + // We have to recompute pack_size(ptx) because it may be different + // than pack_size(tx) (i.e. if one or more results increased + // their size) + total_block_size += fc::raw::pack_size(ptx); + pending_block.transactions.push_back(ptx); + } catch (const fc::exception &e) { + // Do nothing, transaction will not be re-applied + wlog("Transaction was not processed while generating block due to ${e}", ("e", e)); + wlog("The transaction was ${t}", ("t", tx)); + } } } + if( postponed_tx_count > 0 ) { wlog( "Postponed ${n} transactions due to block size limit", ("n", postponed_tx_count) ); } - _pending_tx_session.reset(); + { + const std::lock_guard pending_tx_session_lock{_pending_tx_session_mutex}; + _pending_tx_session.reset(); + } // We have temporarily broken the invariant that // _pending_tx_session is the result of applying _pending_tx, as @@ -597,7 +620,11 @@ signed_block database::_generate_block( */ void database::pop_block() { try { - _pending_tx_session.reset(); + { + const std::lock_guard pending_tx_session_lock{_pending_tx_session_mutex}; + _pending_tx_session.reset(); + } + auto head_id = head_block_id(); optional head_block = fetch_block_by_id( head_id ); GRAPHENE_ASSERT( head_block.valid(), pop_empty_chain, "there are no blocks to pop" ); @@ -611,6 +638,8 @@ void database::pop_block() void database::clear_pending() { try { + const std::lock_guard pending_tx_lock{_pending_tx_mutex}; + const std::lock_guard pending_tx_session_lock{_pending_tx_session_mutex}; assert( (_pending_tx.size() == 0) || _pending_tx_session.valid() ); _pending_tx.clear(); _pending_tx_session.reset(); diff --git a/libraries/chain/db_init.cpp b/libraries/chain/db_init.cpp index e9f3b9f5..82d7fde1 100644 --- a/libraries/chain/db_init.cpp +++ b/libraries/chain/db_init.cpp @@ -374,7 +374,9 @@ void database::initialize_hardforks() void database::initialize_indexes() { reset_indexes(); - _undo_db.set_max_size( GRAPHENE_MIN_UNDO_HISTORY ); + + const std::lock_guard undo_db_lock{_undo_db_mutex}; + _undo_db.set_max_size(GRAPHENE_MIN_UNDO_HISTORY); //Protocol object indexes add_index< primary_index >(); // 8192 assets per chunk @@ -474,7 +476,9 @@ void database::init_genesis(const genesis_state_type& genesis_state) FC_ASSERT(genesis_state.initial_active_witnesses <= genesis_state.initial_witness_candidates.size(), "initial_active_witnesses is larger than the number of candidate witnesses."); + const std::lock_guard undo_db_lock{_undo_db_mutex}; _undo_db.disable(); + struct auth_inhibitor { auth_inhibitor(database& db) : db(db), old_flags(db.node_properties().skip_flags) { db.node_properties().skip_flags |= skip_authority_check; } diff --git a/libraries/chain/db_management.cpp b/libraries/chain/db_management.cpp index 4a3b519f..dea75bc6 100644 --- a/libraries/chain/db_management.cpp +++ b/libraries/chain/db_management.cpp @@ -112,6 +112,7 @@ void database::reindex( fc::path data_dir ) uint32_t undo_point = last_block_num < 50 ? 0 : last_block_num - 50; ilog( "Replaying blocks, starting at ${next}...", ("next",head_block_num() + 1) ); + const std::lock_guard undo_db_lock{_undo_db_mutex}; auto_undo_enabler undo(_slow_replays, _undo_db); if( head_block_num() >= undo_point ) { diff --git a/libraries/chain/include/graphene/chain/database.hpp b/libraries/chain/include/graphene/chain/database.hpp index eeb25167..2a432732 100644 --- a/libraries/chain/include/graphene/chain/database.hpp +++ b/libraries/chain/include/graphene/chain/database.hpp @@ -520,6 +520,7 @@ namespace graphene { namespace chain { void notify_changed_objects(); private: + std::mutex _pending_tx_session_mutex; optional _pending_tx_session; vector< unique_ptr > _operation_evaluators; @@ -602,6 +603,7 @@ namespace graphene { namespace chain { ///@} ///@} + std::mutex _pending_tx_mutex; vector< processed_transaction > _pending_tx; fork_database _fork_db; diff --git a/libraries/db/include/graphene/db/object_database.hpp b/libraries/db/include/graphene/db/object_database.hpp index fa2109aa..e76e5a83 100644 --- a/libraries/db/include/graphene/db/object_database.hpp +++ b/libraries/db/include/graphene/db/object_database.hpp @@ -29,6 +29,7 @@ #include #include +#include namespace graphene { namespace db { @@ -144,6 +145,7 @@ namespace graphene { namespace db { fc::path get_data_dir()const { return _data_dir; } /** public for testing purposes only... should be private in practice. */ + mutable std::mutex _undo_db_mutex; undo_database _undo_db; protected: template diff --git a/libraries/plugins/account_history/account_history_plugin.cpp b/libraries/plugins/account_history/account_history_plugin.cpp index 60ce64f8..a47496f4 100644 --- a/libraries/plugins/account_history/account_history_plugin.cpp +++ b/libraries/plugins/account_history/account_history_plugin.cpp @@ -85,6 +85,7 @@ void account_history_plugin_impl::update_account_histories( const signed_block& vector >& hist = db.get_applied_operations(); bool is_first = true; auto skip_oho_id = [&is_first,&db,this]() { + const std::lock_guard undo_db_lock{db._undo_db_mutex}; if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo { db.remove( db.create( []( operation_history_object& obj) {} ) ); diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index e306054a..fae54da2 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -127,6 +127,7 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b const vector >& hist = db.get_applied_operations(); bool is_first = true; auto skip_oho_id = [&is_first,&db,this]() { + const std::lock_guard undo_db_lock{db._undo_db_mutex}; if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo { db.remove( db.create( []( operation_history_object& obj) {} ) ); diff --git a/libraries/plugins/peerplays_sidechain/peerplays_sidechain_plugin.cpp b/libraries/plugins/peerplays_sidechain/peerplays_sidechain_plugin.cpp index b52f9906..27784720 100644 --- a/libraries/plugins/peerplays_sidechain/peerplays_sidechain_plugin.cpp +++ b/libraries/plugins/peerplays_sidechain/peerplays_sidechain_plugin.cpp @@ -87,6 +87,7 @@ private: std::mutex access_db_mutex; std::mutex access_approve_prop_mutex; std::mutex access_son_down_prop_mutex; + std::mutex access_son_deregister_prop_mutex; std::map sidechain_enabled; std::map> net_handlers; @@ -463,7 +464,7 @@ void peerplays_sidechain_plugin_impl::heartbeat_loop() { //! Check that son is active (at least for one sidechain_type) bool is_son_active = false; for (const auto &active_sidechain_type : active_sidechain_types) { - if(sidechain_enabled.at(active_sidechain_type)) { + if (sidechain_enabled.at(active_sidechain_type)) { if (is_active_son(active_sidechain_type, son_id)) is_son_active = true; } @@ -501,8 +502,13 @@ void peerplays_sidechain_plugin_impl::schedule_son_processing() { const auto next_wakeup = now + std::chrono::microseconds(time_to_next_son_processing); for (const auto &active_sidechain_type : active_sidechain_types) { + if (_son_processing_task.count(active_sidechain_type) != 0 && _son_processing_task.at(active_sidechain_type).wait_for(std::chrono::seconds{0}) != std::future_status::ready) { + wlog("Son doesn't process in time for sidechain: ${active_sidechain_type}", ("active_sidechain_type", active_sidechain_type)); + _son_processing_task.at(active_sidechain_type).wait(); + } + _son_processing_task[active_sidechain_type] = std::async(std::launch::async, [this, next_wakeup, active_sidechain_type] { - if(sidechain_enabled.at(active_sidechain_type)) { + if (sidechain_enabled.at(active_sidechain_type)) { std::this_thread::sleep_until(next_wakeup); son_processing(active_sidechain_type); } @@ -613,7 +619,7 @@ bool peerplays_sidechain_plugin_impl::can_son_participate(sidechain_type sidecha std::map> peerplays_sidechain_plugin_impl::get_son_listener_log() { std::map> result; for (const auto &active_sidechain_type : active_sidechain_types) { - if(net_handlers.at(active_sidechain_type)) { + if (net_handlers.at(active_sidechain_type)) { result.emplace(active_sidechain_type, net_handlers.at(active_sidechain_type)->get_son_listener_log()); } } @@ -626,7 +632,7 @@ void peerplays_sidechain_plugin_impl::approve_proposals(sidechain_type sidechain // into problem of approving the same propsal since it might happens that previous // approved proposal didn't have time or chance to populate the list of available // active proposals which is consulted here in the code. - std::lock_guard lck(access_approve_prop_mutex); + const std::lock_guard lck{access_approve_prop_mutex}; auto check_approve_proposal = [&](const chain::son_id_type &son_id, const chain::proposal_object &proposal) { if (!is_valid_son_proposal(proposal)) { return; @@ -676,7 +682,7 @@ void peerplays_sidechain_plugin_impl::approve_proposals(sidechain_type sidechain } void peerplays_sidechain_plugin_impl::create_son_down_proposals(sidechain_type sidechain) { - std::lock_guard lck(access_son_down_prop_mutex); + const std::lock_guard lck{access_son_down_prop_mutex}; auto create_son_down_proposal = [&](chain::son_id_type son_id, fc::time_point_sec last_active_ts) { chain::database &d = plugin.database(); const chain::global_property_object &gpo = d.get_global_properties(); @@ -740,6 +746,7 @@ void peerplays_sidechain_plugin_impl::create_son_down_proposals(sidechain_type s } void peerplays_sidechain_plugin_impl::create_son_deregister_proposals(sidechain_type sidechain) { + const std::lock_guard lck{access_son_down_prop_mutex}; chain::database &d = plugin.database(); std::set sons_to_be_dereg = d.get_sons_to_be_deregistered(); chain::son_id_type my_son_id = get_current_son_id(sidechain); @@ -778,49 +785,49 @@ void peerplays_sidechain_plugin_impl::create_son_deregister_proposals(sidechain_ } void peerplays_sidechain_plugin_impl::process_proposals(sidechain_type sidechain) { - if(net_handlers.at(sidechain)) { + if (net_handlers.at(sidechain)) { net_handlers.at(sidechain)->process_proposals(); } } void peerplays_sidechain_plugin_impl::process_active_sons_change(sidechain_type sidechain) { - if(net_handlers.at(sidechain)) { + if (net_handlers.at(sidechain)) { net_handlers.at(sidechain)->process_active_sons_change(); } } void peerplays_sidechain_plugin_impl::create_deposit_addresses(sidechain_type sidechain) { - if(net_handlers.at(sidechain)) { + if (net_handlers.at(sidechain)) { net_handlers.at(sidechain)->create_deposit_addresses(); } } void peerplays_sidechain_plugin_impl::process_deposits(sidechain_type sidechain) { - if(net_handlers.at(sidechain)) { + if (net_handlers.at(sidechain)) { net_handlers.at(sidechain)->process_deposits(); } } void peerplays_sidechain_plugin_impl::process_withdrawals(sidechain_type sidechain) { - if(net_handlers.at(sidechain)) { + if (net_handlers.at(sidechain)) { net_handlers.at(sidechain)->process_withdrawals(); } } void peerplays_sidechain_plugin_impl::process_sidechain_transactions(sidechain_type sidechain) { - if(net_handlers.at(sidechain)) { + if (net_handlers.at(sidechain)) { net_handlers.at(sidechain)->process_sidechain_transactions(); } } void peerplays_sidechain_plugin_impl::send_sidechain_transactions(sidechain_type sidechain) { - if(net_handlers.at(sidechain)) { + if (net_handlers.at(sidechain)) { net_handlers.at(sidechain)->send_sidechain_transactions(); } } void peerplays_sidechain_plugin_impl::settle_sidechain_transactions(sidechain_type sidechain) { - if(net_handlers.at(sidechain)) { + if (net_handlers.at(sidechain)) { net_handlers.at(sidechain)->settle_sidechain_transactions(); } }