bug/457-multithread-son-processing
This commit is contained in:
parent
f2f4b57ced
commit
d4c015d400
8 changed files with 97 additions and 50 deletions
|
|
@ -162,10 +162,13 @@ void database::check_transaction_for_duplicated_operations(const signed_transact
|
|||
existed_operations_digests.insert( proposed_operations_digests.begin(), proposed_operations_digests.end() );
|
||||
});
|
||||
|
||||
for (auto& pending_transaction: _pending_tx)
|
||||
{
|
||||
auto proposed_operations_digests = gather_proposed_operations_digests(pending_transaction);
|
||||
existed_operations_digests.insert(proposed_operations_digests.begin(), proposed_operations_digests.end());
|
||||
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
|
||||
for (auto &pending_transaction : _pending_tx)
|
||||
{
|
||||
auto proposed_operations_digests = gather_proposed_operations_digests(pending_transaction);
|
||||
existed_operations_digests.insert(proposed_operations_digests.begin(), proposed_operations_digests.end());
|
||||
}
|
||||
}
|
||||
|
||||
auto proposed_operations_digests = gather_proposed_operations_digests(trx);
|
||||
|
|
@ -187,7 +190,12 @@ bool database::push_block(const signed_block& new_block, uint32_t skip)
|
|||
bool result;
|
||||
detail::with_skip_flags( *this, skip, [&]()
|
||||
{
|
||||
detail::without_pending_transactions( *this, std::move(_pending_tx),
|
||||
std::vector<processed_transaction> pending_tx = [this] {
|
||||
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
|
||||
return std::move(_pending_tx);
|
||||
}();
|
||||
|
||||
detail::without_pending_transactions( *this, std::move(pending_tx),
|
||||
[&]()
|
||||
{
|
||||
result = _push_block(new_block);
|
||||
|
|
@ -387,17 +395,26 @@ processed_transaction database::_push_transaction( const signed_transaction& trx
|
|||
{
|
||||
// If this is the first transaction pushed after applying a block, start a new undo session.
|
||||
// This allows us to quickly rewind to the clean state of the head block, in case a new block arrives.
|
||||
if( !_pending_tx_session.valid() )
|
||||
_pending_tx_session = _undo_db.start_undo_session();
|
||||
{
|
||||
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
|
||||
if (!_pending_tx_session.valid()) {
|
||||
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
|
||||
_pending_tx_session = _undo_db.start_undo_session();
|
||||
}
|
||||
}
|
||||
|
||||
// Create a temporary undo session as a child of _pending_tx_session.
|
||||
// The temporary session will be discarded by the destructor if
|
||||
// _apply_transaction fails. If we make it to merge(), we
|
||||
// apply the changes.
|
||||
|
||||
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
|
||||
auto temp_session = _undo_db.start_undo_session();
|
||||
auto processed_trx = _apply_transaction( trx );
|
||||
_pending_tx.push_back(processed_trx);
|
||||
auto processed_trx = _apply_transaction(trx);
|
||||
{
|
||||
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
|
||||
_pending_tx.push_back(processed_trx);
|
||||
}
|
||||
|
||||
// notify_changed_objects();
|
||||
// The transaction applied successfully. Merge its changes into the pending block session.
|
||||
|
|
@ -410,6 +427,7 @@ processed_transaction database::_push_transaction( const signed_transaction& trx
|
|||
|
||||
processed_transaction database::validate_transaction( const signed_transaction& trx )
|
||||
{
|
||||
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
|
||||
auto session = _undo_db.start_undo_session();
|
||||
return _apply_transaction( trx );
|
||||
}
|
||||
|
|
@ -509,47 +527,52 @@ signed_block database::_generate_block(
|
|||
// the value of the "when" variable is known, which means we need to
|
||||
// re-apply pending transactions in this method.
|
||||
//
|
||||
_pending_tx_session.reset();
|
||||
_pending_tx_session = _undo_db.start_undo_session();
|
||||
{
|
||||
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
|
||||
_pending_tx_session.reset();
|
||||
_pending_tx_session = _undo_db.start_undo_session();
|
||||
}
|
||||
|
||||
uint64_t postponed_tx_count = 0;
|
||||
// pop pending state (reset to head block state)
|
||||
for( const processed_transaction& tx : _pending_tx )
|
||||
{
|
||||
size_t new_total_size = total_block_size + fc::raw::pack_size( tx );
|
||||
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
|
||||
for (const processed_transaction &tx : _pending_tx) {
|
||||
size_t new_total_size = total_block_size + fc::raw::pack_size(tx);
|
||||
|
||||
// postpone transaction if it would make block too big
|
||||
if( new_total_size >= maximum_block_size )
|
||||
{
|
||||
postponed_tx_count++;
|
||||
continue;
|
||||
}
|
||||
// postpone transaction if it would make block too big
|
||||
if (new_total_size >= maximum_block_size) {
|
||||
postponed_tx_count++;
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
auto temp_session = _undo_db.start_undo_session();
|
||||
processed_transaction ptx = _apply_transaction( tx );
|
||||
temp_session.merge();
|
||||
try {
|
||||
auto temp_session = _undo_db.start_undo_session();
|
||||
processed_transaction ptx = _apply_transaction(tx);
|
||||
temp_session.merge();
|
||||
|
||||
// We have to recompute pack_size(ptx) because it may be different
|
||||
// than pack_size(tx) (i.e. if one or more results increased
|
||||
// their size)
|
||||
total_block_size += fc::raw::pack_size( ptx );
|
||||
pending_block.transactions.push_back( ptx );
|
||||
}
|
||||
catch ( const fc::exception& e )
|
||||
{
|
||||
// Do nothing, transaction will not be re-applied
|
||||
wlog( "Transaction was not processed while generating block due to ${e}", ("e", e) );
|
||||
wlog( "The transaction was ${t}", ("t", tx) );
|
||||
// We have to recompute pack_size(ptx) because it may be different
|
||||
// than pack_size(tx) (i.e. if one or more results increased
|
||||
// their size)
|
||||
total_block_size += fc::raw::pack_size(ptx);
|
||||
pending_block.transactions.push_back(ptx);
|
||||
} catch (const fc::exception &e) {
|
||||
// Do nothing, transaction will not be re-applied
|
||||
wlog("Transaction was not processed while generating block due to ${e}", ("e", e));
|
||||
wlog("The transaction was ${t}", ("t", tx));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( postponed_tx_count > 0 )
|
||||
{
|
||||
wlog( "Postponed ${n} transactions due to block size limit", ("n", postponed_tx_count) );
|
||||
}
|
||||
|
||||
_pending_tx_session.reset();
|
||||
{
|
||||
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
|
||||
_pending_tx_session.reset();
|
||||
}
|
||||
|
||||
// We have temporarily broken the invariant that
|
||||
// _pending_tx_session is the result of applying _pending_tx, as
|
||||
|
|
@ -597,7 +620,11 @@ signed_block database::_generate_block(
|
|||
*/
|
||||
void database::pop_block()
|
||||
{ try {
|
||||
_pending_tx_session.reset();
|
||||
{
|
||||
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
|
||||
_pending_tx_session.reset();
|
||||
}
|
||||
|
||||
auto head_id = head_block_id();
|
||||
optional<signed_block> head_block = fetch_block_by_id( head_id );
|
||||
GRAPHENE_ASSERT( head_block.valid(), pop_empty_chain, "there are no blocks to pop" );
|
||||
|
|
@ -611,6 +638,8 @@ void database::pop_block()
|
|||
|
||||
void database::clear_pending()
|
||||
{ try {
|
||||
const std::lock_guard<std::mutex> pending_tx_lock{_pending_tx_mutex};
|
||||
const std::lock_guard<std::mutex> pending_tx_session_lock{_pending_tx_session_mutex};
|
||||
assert( (_pending_tx.size() == 0) || _pending_tx_session.valid() );
|
||||
_pending_tx.clear();
|
||||
_pending_tx_session.reset();
|
||||
|
|
|
|||
|
|
@ -374,7 +374,9 @@ void database::initialize_hardforks()
|
|||
void database::initialize_indexes()
|
||||
{
|
||||
reset_indexes();
|
||||
_undo_db.set_max_size( GRAPHENE_MIN_UNDO_HISTORY );
|
||||
|
||||
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
|
||||
_undo_db.set_max_size(GRAPHENE_MIN_UNDO_HISTORY);
|
||||
|
||||
//Protocol object indexes
|
||||
add_index< primary_index<asset_index, 13> >(); // 8192 assets per chunk
|
||||
|
|
@ -474,7 +476,9 @@ void database::init_genesis(const genesis_state_type& genesis_state)
|
|||
FC_ASSERT(genesis_state.initial_active_witnesses <= genesis_state.initial_witness_candidates.size(),
|
||||
"initial_active_witnesses is larger than the number of candidate witnesses.");
|
||||
|
||||
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
|
||||
_undo_db.disable();
|
||||
|
||||
struct auth_inhibitor {
|
||||
auth_inhibitor(database& db) : db(db), old_flags(db.node_properties().skip_flags)
|
||||
{ db.node_properties().skip_flags |= skip_authority_check; }
|
||||
|
|
|
|||
|
|
@ -112,6 +112,7 @@ void database::reindex( fc::path data_dir )
|
|||
uint32_t undo_point = last_block_num < 50 ? 0 : last_block_num - 50;
|
||||
|
||||
ilog( "Replaying blocks, starting at ${next}...", ("next",head_block_num() + 1) );
|
||||
const std::lock_guard<std::mutex> undo_db_lock{_undo_db_mutex};
|
||||
auto_undo_enabler undo(_slow_replays, _undo_db);
|
||||
if( head_block_num() >= undo_point )
|
||||
{
|
||||
|
|
|
|||
|
|
@ -520,6 +520,7 @@ namespace graphene { namespace chain {
|
|||
void notify_changed_objects();
|
||||
|
||||
private:
|
||||
std::mutex _pending_tx_session_mutex;
|
||||
optional<undo_database::session> _pending_tx_session;
|
||||
vector< unique_ptr<op_evaluator> > _operation_evaluators;
|
||||
|
||||
|
|
@ -602,6 +603,7 @@ namespace graphene { namespace chain {
|
|||
///@}
|
||||
///@}
|
||||
|
||||
std::mutex _pending_tx_mutex;
|
||||
vector< processed_transaction > _pending_tx;
|
||||
fork_database _fork_db;
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@
|
|||
#include <fc/log/logger.hpp>
|
||||
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
|
||||
namespace graphene { namespace db {
|
||||
|
||||
|
|
@ -144,6 +145,7 @@ namespace graphene { namespace db {
|
|||
fc::path get_data_dir()const { return _data_dir; }
|
||||
|
||||
/** public for testing purposes only... should be private in practice. */
|
||||
mutable std::mutex _undo_db_mutex;
|
||||
undo_database _undo_db;
|
||||
protected:
|
||||
template<typename IndexType>
|
||||
|
|
|
|||
|
|
@ -85,6 +85,7 @@ void account_history_plugin_impl::update_account_histories( const signed_block&
|
|||
vector<optional< operation_history_object > >& hist = db.get_applied_operations();
|
||||
bool is_first = true;
|
||||
auto skip_oho_id = [&is_first,&db,this]() {
|
||||
const std::lock_guard<std::mutex> undo_db_lock{db._undo_db_mutex};
|
||||
if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo
|
||||
{
|
||||
db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );
|
||||
|
|
|
|||
|
|
@ -127,6 +127,7 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b
|
|||
const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
|
||||
bool is_first = true;
|
||||
auto skip_oho_id = [&is_first,&db,this]() {
|
||||
const std::lock_guard<std::mutex> undo_db_lock{db._undo_db_mutex};
|
||||
if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo
|
||||
{
|
||||
db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );
|
||||
|
|
|
|||
|
|
@ -87,6 +87,7 @@ private:
|
|||
std::mutex access_db_mutex;
|
||||
std::mutex access_approve_prop_mutex;
|
||||
std::mutex access_son_down_prop_mutex;
|
||||
std::mutex access_son_deregister_prop_mutex;
|
||||
|
||||
std::map<sidechain_type, bool> sidechain_enabled;
|
||||
std::map<sidechain_type, std::unique_ptr<sidechain_net_handler>> net_handlers;
|
||||
|
|
@ -463,7 +464,7 @@ void peerplays_sidechain_plugin_impl::heartbeat_loop() {
|
|||
//! Check that son is active (at least for one sidechain_type)
|
||||
bool is_son_active = false;
|
||||
for (const auto &active_sidechain_type : active_sidechain_types) {
|
||||
if(sidechain_enabled.at(active_sidechain_type)) {
|
||||
if (sidechain_enabled.at(active_sidechain_type)) {
|
||||
if (is_active_son(active_sidechain_type, son_id))
|
||||
is_son_active = true;
|
||||
}
|
||||
|
|
@ -501,8 +502,13 @@ void peerplays_sidechain_plugin_impl::schedule_son_processing() {
|
|||
const auto next_wakeup = now + std::chrono::microseconds(time_to_next_son_processing);
|
||||
|
||||
for (const auto &active_sidechain_type : active_sidechain_types) {
|
||||
if (_son_processing_task.count(active_sidechain_type) != 0 && _son_processing_task.at(active_sidechain_type).wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
|
||||
wlog("Son doesn't process in time for sidechain: ${active_sidechain_type}", ("active_sidechain_type", active_sidechain_type));
|
||||
_son_processing_task.at(active_sidechain_type).wait();
|
||||
}
|
||||
|
||||
_son_processing_task[active_sidechain_type] = std::async(std::launch::async, [this, next_wakeup, active_sidechain_type] {
|
||||
if(sidechain_enabled.at(active_sidechain_type)) {
|
||||
if (sidechain_enabled.at(active_sidechain_type)) {
|
||||
std::this_thread::sleep_until(next_wakeup);
|
||||
son_processing(active_sidechain_type);
|
||||
}
|
||||
|
|
@ -613,7 +619,7 @@ bool peerplays_sidechain_plugin_impl::can_son_participate(sidechain_type sidecha
|
|||
std::map<sidechain_type, std::vector<std::string>> peerplays_sidechain_plugin_impl::get_son_listener_log() {
|
||||
std::map<sidechain_type, std::vector<std::string>> result;
|
||||
for (const auto &active_sidechain_type : active_sidechain_types) {
|
||||
if(net_handlers.at(active_sidechain_type)) {
|
||||
if (net_handlers.at(active_sidechain_type)) {
|
||||
result.emplace(active_sidechain_type, net_handlers.at(active_sidechain_type)->get_son_listener_log());
|
||||
}
|
||||
}
|
||||
|
|
@ -626,7 +632,7 @@ void peerplays_sidechain_plugin_impl::approve_proposals(sidechain_type sidechain
|
|||
// into problem of approving the same propsal since it might happens that previous
|
||||
// approved proposal didn't have time or chance to populate the list of available
|
||||
// active proposals which is consulted here in the code.
|
||||
std::lock_guard<std::mutex> lck(access_approve_prop_mutex);
|
||||
const std::lock_guard<std::mutex> lck{access_approve_prop_mutex};
|
||||
auto check_approve_proposal = [&](const chain::son_id_type &son_id, const chain::proposal_object &proposal) {
|
||||
if (!is_valid_son_proposal(proposal)) {
|
||||
return;
|
||||
|
|
@ -676,7 +682,7 @@ void peerplays_sidechain_plugin_impl::approve_proposals(sidechain_type sidechain
|
|||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::create_son_down_proposals(sidechain_type sidechain) {
|
||||
std::lock_guard<std::mutex> lck(access_son_down_prop_mutex);
|
||||
const std::lock_guard<std::mutex> lck{access_son_down_prop_mutex};
|
||||
auto create_son_down_proposal = [&](chain::son_id_type son_id, fc::time_point_sec last_active_ts) {
|
||||
chain::database &d = plugin.database();
|
||||
const chain::global_property_object &gpo = d.get_global_properties();
|
||||
|
|
@ -740,6 +746,7 @@ void peerplays_sidechain_plugin_impl::create_son_down_proposals(sidechain_type s
|
|||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::create_son_deregister_proposals(sidechain_type sidechain) {
|
||||
const std::lock_guard<std::mutex> lck{access_son_down_prop_mutex};
|
||||
chain::database &d = plugin.database();
|
||||
std::set<son_id_type> sons_to_be_dereg = d.get_sons_to_be_deregistered();
|
||||
chain::son_id_type my_son_id = get_current_son_id(sidechain);
|
||||
|
|
@ -778,49 +785,49 @@ void peerplays_sidechain_plugin_impl::create_son_deregister_proposals(sidechain_
|
|||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::process_proposals(sidechain_type sidechain) {
|
||||
if(net_handlers.at(sidechain)) {
|
||||
if (net_handlers.at(sidechain)) {
|
||||
net_handlers.at(sidechain)->process_proposals();
|
||||
}
|
||||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::process_active_sons_change(sidechain_type sidechain) {
|
||||
if(net_handlers.at(sidechain)) {
|
||||
if (net_handlers.at(sidechain)) {
|
||||
net_handlers.at(sidechain)->process_active_sons_change();
|
||||
}
|
||||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::create_deposit_addresses(sidechain_type sidechain) {
|
||||
if(net_handlers.at(sidechain)) {
|
||||
if (net_handlers.at(sidechain)) {
|
||||
net_handlers.at(sidechain)->create_deposit_addresses();
|
||||
}
|
||||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::process_deposits(sidechain_type sidechain) {
|
||||
if(net_handlers.at(sidechain)) {
|
||||
if (net_handlers.at(sidechain)) {
|
||||
net_handlers.at(sidechain)->process_deposits();
|
||||
}
|
||||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::process_withdrawals(sidechain_type sidechain) {
|
||||
if(net_handlers.at(sidechain)) {
|
||||
if (net_handlers.at(sidechain)) {
|
||||
net_handlers.at(sidechain)->process_withdrawals();
|
||||
}
|
||||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::process_sidechain_transactions(sidechain_type sidechain) {
|
||||
if(net_handlers.at(sidechain)) {
|
||||
if (net_handlers.at(sidechain)) {
|
||||
net_handlers.at(sidechain)->process_sidechain_transactions();
|
||||
}
|
||||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::send_sidechain_transactions(sidechain_type sidechain) {
|
||||
if(net_handlers.at(sidechain)) {
|
||||
if (net_handlers.at(sidechain)) {
|
||||
net_handlers.at(sidechain)->send_sidechain_transactions();
|
||||
}
|
||||
}
|
||||
|
||||
void peerplays_sidechain_plugin_impl::settle_sidechain_transactions(sidechain_type sidechain) {
|
||||
if(net_handlers.at(sidechain)) {
|
||||
if (net_handlers.at(sidechain)) {
|
||||
net_handlers.at(sidechain)->settle_sidechain_transactions();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue