From e0414d390e684ae59a326dff5496f04813f215c3 Mon Sep 17 00:00:00 2001 From: theoreticalbts Date: Wed, 16 Sep 2015 15:50:09 -0400 Subject: [PATCH] Fix pending_block and fork handling The pending_block member of database was a premature optimization and had an unfortunate tendency to get out of sync, especially when switching forks. This commit removes it, and substantially improves the handling of transactions when switching forks. Specifically, flooding or forking no longer causes nodes to discard valid transactions. --- libraries/chain/db_block.cpp | 169 ++++++++++-------- libraries/chain/db_maint.cpp | 21 +-- libraries/chain/db_management.cpp | 12 +- libraries/chain/db_update.cpp | 6 - libraries/chain/fork_database.cpp | 2 + .../chain/include/graphene/chain/database.hpp | 10 +- .../chain/include/graphene/chain/db_with.hpp | 23 ++- .../include/graphene/chain/exceptions.hpp | 2 + 8 files changed, 131 insertions(+), 114 deletions(-) diff --git a/libraries/chain/db_block.cpp b/libraries/chain/db_block.cpp index 7b7933da..edd19243 100644 --- a/libraries/chain/db_block.cpp +++ b/libraries/chain/db_block.cpp @@ -106,7 +106,7 @@ bool database::push_block(const signed_block& new_block, uint32_t skip) bool result; detail::with_skip_flags( *this, skip, [&]() { - detail::without_pending_transactions( *this, std::move(_pending_block.transactions), + detail::without_pending_transactions( *this, std::move(_pending_tx), [&]() { result = _push_block(new_block); @@ -131,7 +131,7 @@ bool database::_push_block(const signed_block& new_block) //Only switch forks if new_head is actually higher than head if( new_head->data.block_num() > head_block_num() ) { - auto branches = _fork_db.fetch_branch_from(new_head->data.id(), _pending_block.previous); + auto branches = _fork_db.fetch_branch_from(new_head->data.id(), head_block_id()); // pop blocks until we hit the forked block while( head_block_id() != branches.second.back()->data.previous ) @@ -214,20 +214,23 @@ processed_transaction database::push_transaction( const signed_transaction& trx, processed_transaction database::_push_transaction( const signed_transaction& trx ) { - uint32_t skip = get_node_properties().skip_flags; // If this is the first transaction pushed after applying a block, start a new undo session. // This allows us to quickly rewind to the clean state of the head block, in case a new block arrives. - if( !_pending_block_session ) _pending_block_session = _undo_db.start_undo_session(); - auto session = _undo_db.start_undo_session(); - auto processed_trx = _apply_transaction( trx ); - _pending_block.transactions.push_back(processed_trx); + if( !_pending_tx_session.valid() ) + _pending_tx_session = _undo_db.start_undo_session(); - FC_ASSERT( (skip & skip_block_size_check) || - fc::raw::pack_size(_pending_block) <= get_global_properties().parameters.maximum_block_size ); + // Create a temporary undo session as a child of _pending_tx_session. + // The temporary session will be discarded by the destructor if + // _apply_transaction fails. If we make it to merge(), we + // apply the changes. + + auto temp_session = _undo_db.start_undo_session(); + auto processed_trx = _apply_transaction( trx ); + _pending_tx.push_back(processed_trx); notify_changed_objects(); // The transaction applied successfully. Merge its changes into the pending block session. - session.merge(); + temp_session.merge(); // notify anyone listening to pending transactions on_pending_transaction( trx ); @@ -293,68 +296,77 @@ signed_block database::_generate_block( if( !(skip & skip_witness_signature) ) FC_ASSERT( witness_obj.signing_key == block_signing_private_key.get_public_key() ); - _pending_block.timestamp = when; + static const size_t max_block_header_size = fc::raw::pack_size( signed_block_header() ) + 4; + auto maximum_block_size = get_global_properties().parameters.maximum_block_size; + size_t total_block_size = max_block_header_size; - _pending_block.transaction_merkle_root = _pending_block.calculate_merkle_root(); + signed_block pending_block; - _pending_block.witness = witness_id; - block_id_type head_id = head_block_id(); - if( _pending_block.previous != head_id ) + // + // The following code throws away existing pending_tx_session and + // rebuilds it by re-applying pending transactions. + // + // This rebuild is necessary because pending transactions' validity + // and semantics may have changed since they were received, because + // time-based semantics are evaluated based on the current block + // time. These changes can only be reflected in the database when + // the value of the "when" variable is known, which means we need to + // re-apply pending transactions in this method. + // + _pending_tx_session.reset(); + _pending_tx_session = _undo_db.start_undo_session(); + + uint64_t postponed_tx_count = 0; + // pop pending state (reset to head block state) + for( const processed_transaction& tx : _pending_tx ) { - wlog( "_pending_block.previous was ${old}, setting to head_block_id ${new}", ("old", _pending_block.previous)("new", head_id) ); - _pending_block.previous = head_id; - } - if( !(skip & skip_witness_signature) ) _pending_block.sign( block_signing_private_key ); + size_t new_total_size = total_block_size + fc::raw::pack_size( tx ); - FC_ASSERT( fc::raw::pack_size(_pending_block) <= get_global_properties().parameters.maximum_block_size ); - signed_block tmp = _pending_block; - tmp.transaction_merkle_root = tmp.calculate_merkle_root(); - _pending_block.transactions.clear(); - - bool failed = false; - try { push_block( tmp, skip ); } - catch ( const undo_database_exception& e ) { throw; } - catch ( const fc::exception& e ) - { - if( !retry_on_failure ) + // postpone transaction if it would make block too big + if( new_total_size >= maximum_block_size ) { - failed = true; + postponed_tx_count++; + continue; } - else + + try { - wlog( "Reason for block production failure: ${e}", ("e",e) ); - throw; + auto temp_session = _undo_db.start_undo_session(); + processed_transaction ptx = _apply_transaction( tx ); + temp_session.merge(); + + // We have to recompute pack_size(ptx) because it may be different + // than pack_size(tx) (i.e. if one or more results increased + // their size) + total_block_size += fc::raw::pack_size( ptx ); + pending_block.transactions.push_back( ptx ); + } + catch ( const fc::exception& e ) + { + // Do nothing, transaction will not be re-applied + wlog( "Transaction was not processed while generating block due to ${e}", ("e", e) ); + wlog( "The transaction was ${t}", ("t", tx) ); } } - if( failed ) + if( postponed_tx_count > 0 ) { - uint32_t failed_tx_count = 0; - for( const auto& trx : tmp.transactions ) - { - try - { - push_transaction( trx, skip ); - } - catch ( const fc::exception& e ) - { - wlog( "Transaction is no longer valid: ${trx}", ("trx",trx) ); - failed_tx_count++; - } - } - if( failed_tx_count == 0 ) - { - // - // this is in generate_block() so this intensive logging - // (dumping a whole block) should be rate-limited - // to once per block production attempt - // - // TODO: Turn this off again once #261 is resolved. - // - wlog( "Block creation failed even though all tx's are still valid. Block: ${b}", ("b",tmp) ); - } - return _generate_block( when, witness_id, block_signing_private_key, false ); + wlog( "Postponed ${n} transactions due to block size limit", ("n", postponed_tx_count) ); } - return tmp; + _pending_tx_session.reset(); + + pending_block.previous = head_block_id(); + pending_block.timestamp = when; + pending_block.transaction_merkle_root = pending_block.calculate_merkle_root(); + pending_block.witness = witness_id; + + if( !(skip & skip_witness_signature) ) + pending_block.sign( block_signing_private_key ); + + FC_ASSERT( fc::raw::pack_size(pending_block) <= get_global_properties().parameters.maximum_block_size ); + + push_block( pending_block, skip ); + + return pending_block; } FC_CAPTURE_AND_RETHROW( (witness_id) ) } /** @@ -363,19 +375,23 @@ signed_block database::_generate_block( */ void database::pop_block() { try { - _pending_block_session.reset(); - auto prev = _pending_block.previous; + _pending_tx_session.reset(); + auto head_id = head_block_id(); + optional head_block = fetch_block_by_id( head_id ); + GRAPHENE_ASSERT( head_block.valid(), pop_empty_chain, "there are no blocks to pop" ); pop_undo(); - _block_id_to_block.remove( prev ); - _pending_block.previous = head_block_id(); - _pending_block.timestamp = head_block_time(); + _block_id_to_block.remove( head_id ); _fork_db.pop_block(); + + _popped_tx.insert( _popped_tx.begin(), head_block->transactions.begin(), head_block->transactions.end() ); + } FC_CAPTURE_AND_RETHROW() } void database::clear_pending() { try { - _pending_block.transactions.clear(); - _pending_block_session.reset(); + assert( (_pending_tx.size() == 0) || _pending_tx_session.valid() ); + _pending_tx.clear(); + _pending_tx_session.reset(); } FC_CAPTURE_AND_RETHROW() } uint32_t database::push_applied_operation( const operation& op ) @@ -451,11 +467,8 @@ void database::_apply_block( const signed_block& next_block ) update_global_dynamic_data(next_block); update_signing_witness(signing_witness, next_block); - auto current_block_interval = global_props.parameters.block_interval; - // Are we at the maintenance interval? if( maint_needed ) - // This will update _pending_block.timestamp if the block interval has changed perform_chain_maintenance(next_block, global_props); create_block_summary(next_block); @@ -477,8 +490,6 @@ void database::_apply_block( const signed_block& next_block ) _applied_ops.clear(); notify_changed_objects(); - - update_pending_block(next_block, current_block_interval); } FC_CAPTURE_AND_RETHROW( (next_block.block_num()) ) } void database::notify_changed_objects() @@ -542,9 +553,11 @@ processed_transaction database::_apply_transaction(const signed_transaction& trx FC_ASSERT( trx.ref_block_prefix == tapos_block_summary.block_id._hash[1] ); } - FC_ASSERT( trx.expiration <= _pending_block.timestamp + chain_parameters.maximum_time_until_expiration, "", - ("trx.expiration",trx.expiration)("_pending_block.timestamp",_pending_block.timestamp)("max_til_exp",chain_parameters.maximum_time_until_expiration)); - FC_ASSERT( _pending_block.timestamp <= trx.expiration, "", ("pending.timestamp",_pending_block.timestamp)("trx.exp",trx.expiration) ); + fc::time_point_sec now = head_block_time(); + + FC_ASSERT( trx.expiration <= now + chain_parameters.maximum_time_until_expiration, "", + ("trx.expiration",trx.expiration)("now",now)("max_til_exp",chain_parameters.maximum_time_until_expiration)); + FC_ASSERT( now <= trx.expiration, "", ("now",now)("trx.exp",trx.expiration) ); } //Insert transaction into unique transactions database. @@ -595,8 +608,8 @@ operation_result database::apply_operation(transaction_evaluation_state& eval_st const witness_object& database::validate_block_header( uint32_t skip, const signed_block& next_block )const { - FC_ASSERT( _pending_block.previous == next_block.previous, "", ("pending.prev",_pending_block.previous)("next.prev",next_block.previous) ); - FC_ASSERT( _pending_block.timestamp <= next_block.timestamp, "", ("_pending_block.timestamp",_pending_block.timestamp)("next",next_block.timestamp)("blocknum",next_block.block_num()) ); + FC_ASSERT( head_block_id() == next_block.previous, "", ("head_block_id",head_block_id())("next.prev",next_block.previous) ); + FC_ASSERT( head_block_time() < next_block.timestamp, "", ("head_block_time",head_block_time())("next",next_block.timestamp)("blocknum",next_block.block_num()) ); const witness_object& witness = next_block.witness(*this); if( !(skip&skip_witness_signature) ) diff --git a/libraries/chain/db_maint.cpp b/libraries/chain/db_maint.cpp index 0001af71..b6a4ae98 100644 --- a/libraries/chain/db_maint.cpp +++ b/libraries/chain/db_maint.cpp @@ -103,7 +103,7 @@ void database::pay_workers( share_type& budget ) vector> active_workers; get_index_type().inspect_all_objects([this, &active_workers](const object& o) { const worker_object& w = static_cast(o); - auto now = _pending_block.timestamp; + auto now = head_block_time(); if( w.is_active(now) && w.approving_stake(_vote_tally_buffer) > 0 ) active_workers.emplace_back(w); }); @@ -122,10 +122,10 @@ void database::pay_workers( share_type& budget ) { const worker_object& active_worker = active_workers[i]; share_type requested_pay = active_worker.daily_pay; - if( _pending_block.timestamp - get_dynamic_global_properties().last_budget_time != fc::days(1) ) + if( head_block_time() - get_dynamic_global_properties().last_budget_time != fc::days(1) ) { fc::uint128 pay(requested_pay.value); - pay *= (_pending_block.timestamp - get_dynamic_global_properties().last_budget_time).count(); + pay *= (head_block_time() - get_dynamic_global_properties().last_budget_time).count(); pay /= fc::days(1).count(); requested_pay = pay.to_uint64(); } @@ -322,7 +322,7 @@ void database::process_budget() const dynamic_global_property_object& dpo = get_dynamic_global_properties(); const asset_dynamic_data_object& core = asset_id_type(0)(*this).dynamic_asset_data_id(*this); - fc::time_point_sec now = _pending_block.timestamp; + fc::time_point_sec now = head_block_time(); int64_t time_to_maint = (dpo.next_maintenance_time - now).to_seconds(); // @@ -499,19 +499,6 @@ void database::perform_chain_maintenance(const signed_block& next_block, const g } }); - auto new_block_interval = global_props.parameters.block_interval; - - // if block interval CHANGED during this block *THEN* we cannot simply - // add the interval if we want to maintain the invariant that all timestamps are a multiple - // of the interval. - _pending_block.timestamp = next_block.timestamp + fc::seconds(new_block_interval); - uint32_t r = _pending_block.timestamp.sec_since_epoch()%new_block_interval; - if( !r ) - { - _pending_block.timestamp -= r; - assert( (_pending_block.timestamp.sec_since_epoch() % new_block_interval) == 0 ); - } - auto next_maintenance_time = get(dynamic_global_property_id_type()).next_maintenance_time; auto maintenance_interval = gpo.parameters.maintenance_interval; diff --git a/libraries/chain/db_management.cpp b/libraries/chain/db_management.cpp index 2010bc1f..47505942 100644 --- a/libraries/chain/db_management.cpp +++ b/libraries/chain/db_management.cpp @@ -32,9 +32,9 @@ database::database() initialize_evaluators(); } -database::~database(){ - if( _pending_block_session ) - _pending_block_session->commit(); +database::~database() +{ + clear_pending(); } void database::reindex(fc::path data_dir, const genesis_state_type& initial_allocation) @@ -113,9 +113,6 @@ void database::open( if( !find(global_property_id_type()) ) init_genesis(genesis_loader()); - _pending_block.previous = head_block_id(); - _pending_block.timestamp = head_block_time(); - fc::optional last_block = _block_id_to_block.last(); if( last_block.valid() ) { @@ -133,7 +130,8 @@ void database::open( void database::close(uint32_t blocks_to_rewind) { - _pending_block_session.reset(); + // TODO: Save pending tx's on close() + clear_pending(); // pop all of the blocks that we can given our undo history, this should // throw when there is no more undo history to pop diff --git a/libraries/chain/db_update.cpp b/libraries/chain/db_update.cpp index debb05a0..662c5743 100644 --- a/libraries/chain/db_update.cpp +++ b/libraries/chain/db_update.cpp @@ -97,12 +97,6 @@ void database::update_signing_witness(const witness_object& signing_witness, con } ); } -void database::update_pending_block(const signed_block& next_block, uint8_t current_block_interval) -{ - _pending_block.timestamp = next_block.timestamp + current_block_interval; - _pending_block.previous = next_block.id(); -} - void database::clear_expired_transactions() { //Look for expired transactions in the deduplication list, and remove them. diff --git a/libraries/chain/fork_database.cpp b/libraries/chain/fork_database.cpp index 5604d1c4..9d2d9420 100644 --- a/libraries/chain/fork_database.cpp +++ b/libraries/chain/fork_database.cpp @@ -186,6 +186,8 @@ vector fork_database::fetch_block_by_number(uint32_t num)const pair fork_database::fetch_branch_from(block_id_type first, block_id_type second)const { try { + // This function gets a branch (i.e. vector) leading + // back to the most recent common ancestor. pair result; auto first_branch_itr = _index.get().find(first); FC_ASSERT(first_branch_itr != _index.get().end()); diff --git a/libraries/chain/include/graphene/chain/database.hpp b/libraries/chain/include/graphene/chain/database.hpp index 4445654a..af417624 100644 --- a/libraries/chain/include/graphene/chain/database.hpp +++ b/libraries/chain/include/graphene/chain/database.hpp @@ -379,6 +379,11 @@ namespace graphene { namespace chain { */ processed_transaction validate_transaction( const signed_transaction& trx ); + + /** when popping a block, the transactions that were removed get cached here so they + * can be reapplied at the proper time */ + std::deque< signed_transaction > _popped_tx; + /** * @} */ @@ -388,7 +393,7 @@ namespace graphene { namespace chain { void notify_changed_objects(); private: - optional _pending_block_session; + optional _pending_tx_session; vector< unique_ptr > _operation_evaluators; template @@ -413,7 +418,6 @@ namespace graphene { namespace chain { //////////////////// db_update.cpp //////////////////// void update_global_dynamic_data( const signed_block& b ); void update_signing_witness(const witness_object& signing_witness, const signed_block& new_block); - void update_pending_block(const signed_block& next_block, uint8_t current_block_interval); void clear_expired_transactions(); void clear_expired_proposals(); void clear_expired_orders(); @@ -439,7 +443,7 @@ namespace graphene { namespace chain { ///@} ///@} - signed_block _pending_block; + vector< processed_transaction > _pending_tx; fork_database _fork_db; /** diff --git a/libraries/chain/include/graphene/chain/db_with.hpp b/libraries/chain/include/graphene/chain/db_with.hpp index 09781f0f..9a08cdd7 100644 --- a/libraries/chain/include/graphene/chain/db_with.hpp +++ b/libraries/chain/include/graphene/chain/db_with.hpp @@ -56,6 +56,9 @@ struct skip_flags_restorer /** * Class used to help the without_pending_transactions * implementation. + * + * TODO: Change the name of this class to better reflect the fact + * that it restores popped transactions as well as pending transactions. */ struct pending_transactions_restorer { @@ -67,13 +70,27 @@ struct pending_transactions_restorer ~pending_transactions_restorer() { + for( const auto& tx : _db._popped_tx ) + { + try { + if( !_db.is_known_transaction( tx.id() ) ) { + // since push_transaction() takes a signed_transaction, + // the operation_results field will be ignored. + _db._push_transaction( tx ); + } + } catch ( const fc::exception& ) { + } + } + _db._popped_tx.clear(); for( const processed_transaction& tx : _pending_transactions ) { try { - // since push_transaction() takes a signed_transaction, - // the operation_results field will be ignored. - _db.push_transaction( tx ); + if( !_db.is_known_transaction( tx.id() ) ) { + // since push_transaction() takes a signed_transaction, + // the operation_results field will be ignored. + _db._push_transaction( tx ); + } } catch( const fc::exception& e ) { diff --git a/libraries/chain/include/graphene/chain/exceptions.hpp b/libraries/chain/include/graphene/chain/exceptions.hpp index 3860e33b..7bdc7ca5 100644 --- a/libraries/chain/include/graphene/chain/exceptions.hpp +++ b/libraries/chain/include/graphene/chain/exceptions.hpp @@ -82,6 +82,8 @@ namespace graphene { namespace chain { FC_DECLARE_DERIVED_EXCEPTION( invalid_pts_address, graphene::chain::utility_exception, 3060001, "invalid pts address" ) FC_DECLARE_DERIVED_EXCEPTION( insufficient_feeds, graphene::chain::chain_exception, 37006, "insufficient feeds" ) + FC_DECLARE_DERIVED_EXCEPTION( pop_empty_chain, graphene::chain::undo_database_exception, 3070001, "there are no blocks to pop" ) + GRAPHENE_DECLARE_OP_BASE_EXCEPTIONS( transfer ); GRAPHENE_DECLARE_OP_EVALUATE_EXCEPTION( from_account_not_whitelisted, transfer, 1, "owner mismatch" ) GRAPHENE_DECLARE_OP_EVALUATE_EXCEPTION( to_account_not_whitelisted, transfer, 2, "owner mismatch" )