Merge branch 'bug/457-multithread-son-processing' into 'develop'

bug/457-multithread-son-processing

See merge request PBSA/peerplays!160
This commit is contained in:
serkixenos 2022-10-03 17:04:56 +00:00
commit f9314a4c0c
8 changed files with 97 additions and 50 deletions

View file

@ -162,11 +162,14 @@ void database::check_transaction_for_duplicated_operations(const signed_transact
existed_operations_digests.insert( proposed_operations_digests.begin(), proposed_operations_digests.end() );
});
for (auto& pending_transaction: _pending_tx)
{
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
for (auto &pending_transaction : _pending_tx)
{
auto proposed_operations_digests = gather_proposed_operations_digests(pending_transaction);
existed_operations_digests.insert(proposed_operations_digests.begin(), proposed_operations_digests.end());
}
}
auto proposed_operations_digests = gather_proposed_operations_digests(trx);
for (auto& digest: proposed_operations_digests)
@ -187,7 +190,12 @@ bool database::push_block(const signed_block& new_block, uint32_t skip)
bool result;
detail::with_skip_flags( *this, skip, [&]()
{
detail::without_pending_transactions( *this, std::move(_pending_tx),
std::vector<processed_transaction> pending_tx = [this] {
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
return std::move(_pending_tx);
}();
detail::without_pending_transactions( *this, std::move(pending_tx),
[&]()
{
result = _push_block(new_block);
@ -387,17 +395,26 @@ processed_transaction database::_push_transaction( const signed_transaction& trx
{
// If this is the first transaction pushed after applying a block, start a new undo session.
// This allows us to quickly rewind to the clean state of the head block, in case a new block arrives.
if( !_pending_tx_session.valid() )
{
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
if (!_pending_tx_session.valid()) {
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
_pending_tx_session = _undo_db.start_undo_session();
}
}
// Create a temporary undo session as a child of _pending_tx_session.
// The temporary session will be discarded by the destructor if
// _apply_transaction fails. If we make it to merge(), we
// apply the changes.
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
auto temp_session = _undo_db.start_undo_session();
auto processed_trx = _apply_transaction( trx );
auto processed_trx = _apply_transaction(trx);
{
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
_pending_tx.push_back(processed_trx);
}
// notify_changed_objects();
// The transaction applied successfully. Merge its changes into the pending block session.
@ -410,6 +427,7 @@ processed_transaction database::_push_transaction( const signed_transaction& trx
processed_transaction database::validate_transaction( const signed_transaction& trx )
{
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
auto session = _undo_db.start_undo_session();
return _apply_transaction( trx );
}
@ -509,47 +527,52 @@ signed_block database::_generate_block(
// the value of the "when" variable is known, which means we need to
// re-apply pending transactions in this method.
//
{
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
_pending_tx_session.reset();
_pending_tx_session = _undo_db.start_undo_session();
}
uint64_t postponed_tx_count = 0;
// pop pending state (reset to head block state)
for( const processed_transaction& tx : _pending_tx )
{
size_t new_total_size = total_block_size + fc::raw::pack_size( tx );
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
for (const processed_transaction &tx : _pending_tx) {
size_t new_total_size = total_block_size + fc::raw::pack_size(tx);
// postpone transaction if it would make block too big
if( new_total_size >= maximum_block_size )
{
if (new_total_size >= maximum_block_size) {
postponed_tx_count++;
continue;
}
try
{
try {
auto temp_session = _undo_db.start_undo_session();
processed_transaction ptx = _apply_transaction( tx );
processed_transaction ptx = _apply_transaction(tx);
temp_session.merge();
// We have to recompute pack_size(ptx) because it may be different
// than pack_size(tx) (i.e. if one or more results increased
// their size)
total_block_size += fc::raw::pack_size( ptx );
pending_block.transactions.push_back( ptx );
}
catch ( const fc::exception& e )
{
total_block_size += fc::raw::pack_size(ptx);
pending_block.transactions.push_back(ptx);
} catch (const fc::exception &e) {
// Do nothing, transaction will not be re-applied
wlog( "Transaction was not processed while generating block due to ${e}", ("e", e) );
wlog( "The transaction was ${t}", ("t", tx) );
wlog("Transaction was not processed while generating block due to ${e}", ("e", e));
wlog("The transaction was ${t}", ("t", tx));
}
}
}
if( postponed_tx_count > 0 )
{
wlog( "Postponed ${n} transactions due to block size limit", ("n", postponed_tx_count) );
}
{
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
_pending_tx_session.reset();
}
// We have temporarily broken the invariant that
// _pending_tx_session is the result of applying _pending_tx, as
@ -597,7 +620,11 @@ signed_block database::_generate_block(
*/
void database::pop_block()
{ try {
{
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
_pending_tx_session.reset();
}
auto head_id = head_block_id();
optional<signed_block> head_block = fetch_block_by_id( head_id );
GRAPHENE_ASSERT( head_block.valid(), pop_empty_chain, "there are no blocks to pop" );
@ -611,6 +638,8 @@ void database::pop_block()
void database::clear_pending()
{ try {
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
assert( (_pending_tx.size() == 0) || _pending_tx_session.valid() );
_pending_tx.clear();
_pending_tx_session.reset();

View file

@ -374,7 +374,9 @@ void database::initialize_hardforks()
void database::initialize_indexes()
{
reset_indexes();
_undo_db.set_max_size( GRAPHENE_MIN_UNDO_HISTORY );
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
_undo_db.set_max_size(GRAPHENE_MIN_UNDO_HISTORY);
//Protocol object indexes
add_index< primary_index<asset_index, 13> >(); // 8192 assets per chunk
@ -474,7 +476,9 @@ void database::init_genesis(const genesis_state_type& genesis_state)
FC_ASSERT(genesis_state.initial_active_witnesses <= genesis_state.initial_witness_candidates.size(),
"initial_active_witnesses is larger than the number of candidate witnesses.");
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
_undo_db.disable();
struct auth_inhibitor {
auth_inhibitor(database& db) : db(db), old_flags(db.node_properties().skip_flags)
{ db.node_properties().skip_flags |= skip_authority_check; }

View file

@ -112,6 +112,7 @@ void database::reindex( fc::path data_dir )
uint32_t undo_point = last_block_num < 50 ? 0 : last_block_num - 50;
ilog( "Replaying blocks, starting at ${next}...", ("next",head_block_num() + 1) );
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
auto_undo_enabler undo(_slow_replays, _undo_db);
if( head_block_num() >= undo_point )
{

View file

@ -520,6 +520,7 @@ namespace graphene { namespace chain {
void notify_changed_objects();
private:
std::mutex _pending_tx_session_mutex;
optional<undo_database::session> _pending_tx_session;
vector< unique_ptr<op_evaluator> > _operation_evaluators;
@ -602,6 +603,7 @@ namespace graphene { namespace chain {
///@}
///@}
std::mutex _pending_tx_mutex;
vector< processed_transaction > _pending_tx;
fork_database _fork_db;

View file

@ -29,6 +29,7 @@
#include <fc/log/logger.hpp>
#include <map>
#include <mutex>
namespace graphene { namespace db {
@ -144,6 +145,7 @@ namespace graphene { namespace db {
fc::path get_data_dir()const { return _data_dir; }
/** public for testing purposes only... should be private in practice. */
mutable std::mutex _undo_db_mutex;
undo_database _undo_db;
protected:
template<typename IndexType>

View file

@ -85,6 +85,7 @@ void account_history_plugin_impl::update_account_histories( const signed_block&
vector<optional< operation_history_object > >& hist = db.get_applied_operations();
bool is_first = true;
auto skip_oho_id = [&is_first,&db,this]() {
const std::lock_guard<std::mutex> undo_db_lock{db._undo_db_mutex};
if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo
{
db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );

View file

@ -127,6 +127,7 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b
const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
bool is_first = true;
auto skip_oho_id = [&is_first,&db,this]() {
const std::lock_guard<std::mutex> undo_db_lock{db._undo_db_mutex};
if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo
{
db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );

View file

@ -87,6 +87,7 @@ private:
std::mutex access_db_mutex;
std::mutex access_approve_prop_mutex;
std::mutex access_son_down_prop_mutex;
std::mutex access_son_deregister_prop_mutex;
std::map<sidechain_type, bool> sidechain_enabled;
std::map<sidechain_type, std::unique_ptr<sidechain_net_handler>> net_handlers;
@ -463,7 +464,7 @@ void peerplays_sidechain_plugin_impl::heartbeat_loop() {
//! Check that son is active (at least for one sidechain_type)
bool is_son_active = false;
for (const auto &active_sidechain_type : active_sidechain_types) {
if(sidechain_enabled.at(active_sidechain_type)) {
if (sidechain_enabled.at(active_sidechain_type)) {
if (is_active_son(active_sidechain_type, son_id))
is_son_active = true;
}
@ -501,8 +502,13 @@ void peerplays_sidechain_plugin_impl::schedule_son_processing() {
const auto next_wakeup = now + std::chrono::microseconds(time_to_next_son_processing);
for (const auto &active_sidechain_type : active_sidechain_types) {
if (_son_processing_task.count(active_sidechain_type) != 0 && _son_processing_task.at(active_sidechain_type).wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
wlog("Son doesn't process in time for sidechain: ${active_sidechain_type}", ("active_sidechain_type", active_sidechain_type));
_son_processing_task.at(active_sidechain_type).wait();
}
_son_processing_task[active_sidechain_type] = std::async(std::launch::async, [this, next_wakeup, active_sidechain_type] {
if(sidechain_enabled.at(active_sidechain_type)) {
if (sidechain_enabled.at(active_sidechain_type)) {
std::this_thread::sleep_until(next_wakeup);
son_processing(active_sidechain_type);
}
@ -613,7 +619,7 @@ bool peerplays_sidechain_plugin_impl::can_son_participate(sidechain_type sidecha
std::map<sidechain_type, std::vector<std::string>> peerplays_sidechain_plugin_impl::get_son_listener_log() {
std::map<sidechain_type, std::vector<std::string>> result;
for (const auto &active_sidechain_type : active_sidechain_types) {
if(net_handlers.at(active_sidechain_type)) {
if (net_handlers.at(active_sidechain_type)) {
result.emplace(active_sidechain_type, net_handlers.at(active_sidechain_type)->get_son_listener_log());
}
}
@ -626,7 +632,7 @@ void peerplays_sidechain_plugin_impl::approve_proposals(sidechain_type sidechain
// into problem of approving the same propsal since it might happens that previous
// approved proposal didn't have time or chance to populate the list of available
// active proposals which is consulted here in the code.
std::lock_guard<std::mutex> lck(access_approve_prop_mutex);
const std::lock_guard<std::mutex> lck{access_approve_prop_mutex};
auto check_approve_proposal = [&](const chain::son_id_type &son_id, const chain::proposal_object &proposal) {
if (!is_valid_son_proposal(proposal)) {
return;
@ -676,7 +682,7 @@ void peerplays_sidechain_plugin_impl::approve_proposals(sidechain_type sidechain
}
void peerplays_sidechain_plugin_impl::create_son_down_proposals(sidechain_type sidechain) {
std::lock_guard<std::mutex> lck(access_son_down_prop_mutex);
const std::lock_guard<std::mutex> lck{access_son_down_prop_mutex};
auto create_son_down_proposal = [&](chain::son_id_type son_id, fc::time_point_sec last_active_ts) {
chain::database &d = plugin.database();
const chain::global_property_object &gpo = d.get_global_properties();
@ -740,6 +746,7 @@ void peerplays_sidechain_plugin_impl::create_son_down_proposals(sidechain_type s
}
void peerplays_sidechain_plugin_impl::create_son_deregister_proposals(sidechain_type sidechain) {
const std::lock_guard<std::mutex> lck{access_son_down_prop_mutex};
chain::database &d = plugin.database();
std::set<son_id_type> sons_to_be_dereg = d.get_sons_to_be_deregistered();
chain::son_id_type my_son_id = get_current_son_id(sidechain);
@ -778,49 +785,49 @@ void peerplays_sidechain_plugin_impl::create_son_deregister_proposals(sidechain_
}
void peerplays_sidechain_plugin_impl::process_proposals(sidechain_type sidechain) {
if(net_handlers.at(sidechain)) {
if (net_handlers.at(sidechain)) {
net_handlers.at(sidechain)->process_proposals();
}
}
void peerplays_sidechain_plugin_impl::process_active_sons_change(sidechain_type sidechain) {
if(net_handlers.at(sidechain)) {
if (net_handlers.at(sidechain)) {
net_handlers.at(sidechain)->process_active_sons_change();
}
}
void peerplays_sidechain_plugin_impl::create_deposit_addresses(sidechain_type sidechain) {
if(net_handlers.at(sidechain)) {
if (net_handlers.at(sidechain)) {
net_handlers.at(sidechain)->create_deposit_addresses();
}
}
void peerplays_sidechain_plugin_impl::process_deposits(sidechain_type sidechain) {
if(net_handlers.at(sidechain)) {
if (net_handlers.at(sidechain)) {
net_handlers.at(sidechain)->process_deposits();
}
}
void peerplays_sidechain_plugin_impl::process_withdrawals(sidechain_type sidechain) {
if(net_handlers.at(sidechain)) {
if (net_handlers.at(sidechain)) {
net_handlers.at(sidechain)->process_withdrawals();
}
}
void peerplays_sidechain_plugin_impl::process_sidechain_transactions(sidechain_type sidechain) {
if(net_handlers.at(sidechain)) {
if (net_handlers.at(sidechain)) {
net_handlers.at(sidechain)->process_sidechain_transactions();
}
}
void peerplays_sidechain_plugin_impl::send_sidechain_transactions(sidechain_type sidechain) {
if(net_handlers.at(sidechain)) {
if (net_handlers.at(sidechain)) {
net_handlers.at(sidechain)->send_sidechain_transactions();
}
}
void peerplays_sidechain_plugin_impl::settle_sidechain_transactions(sidechain_type sidechain) {
if(net_handlers.at(sidechain)) {
if (net_handlers.at(sidechain)) {
net_handlers.at(sidechain)->settle_sidechain_transactions();
}
}