Fix pending_block and fork handling

The pending_block member of database was a premature optimization and had an
unfortunate tendency to get out of sync, especially when switching forks.
This commit removes it, and substantially improves the handling of transactions
when switching forks.  Specifically, flooding or forking no longer causes nodes to
discard valid transactions.
This commit is contained in:
theoreticalbts 2015-09-16 15:50:09 -04:00
parent ec030ee46c
commit e0414d390e
8 changed files with 131 additions and 114 deletions

View file

@ -106,7 +106,7 @@ bool database::push_block(const signed_block& new_block, uint32_t skip)
bool result;
detail::with_skip_flags( *this, skip, [&]()
{
detail::without_pending_transactions( *this, std::move(_pending_block.transactions),
detail::without_pending_transactions( *this, std::move(_pending_tx),
[&]()
{
result = _push_block(new_block);
@ -131,7 +131,7 @@ bool database::_push_block(const signed_block& new_block)
//Only switch forks if new_head is actually higher than head
if( new_head->data.block_num() > head_block_num() )
{
auto branches = _fork_db.fetch_branch_from(new_head->data.id(), _pending_block.previous);
auto branches = _fork_db.fetch_branch_from(new_head->data.id(), head_block_id());
// pop blocks until we hit the forked block
while( head_block_id() != branches.second.back()->data.previous )
@ -214,20 +214,23 @@ processed_transaction database::push_transaction( const signed_transaction& trx,
processed_transaction database::_push_transaction( const signed_transaction& trx )
{
uint32_t skip = get_node_properties().skip_flags;
// If this is the first transaction pushed after applying a block, start a new undo session.
// This allows us to quickly rewind to the clean state of the head block, in case a new block arrives.
if( !_pending_block_session ) _pending_block_session = _undo_db.start_undo_session();
auto session = _undo_db.start_undo_session();
auto processed_trx = _apply_transaction( trx );
_pending_block.transactions.push_back(processed_trx);
if( !_pending_tx_session.valid() )
_pending_tx_session = _undo_db.start_undo_session();
FC_ASSERT( (skip & skip_block_size_check) ||
fc::raw::pack_size(_pending_block) <= get_global_properties().parameters.maximum_block_size );
// Create a temporary undo session as a child of _pending_tx_session.
// The temporary session will be discarded by the destructor if
// _apply_transaction fails. If we make it to merge(), we
// apply the changes.
auto temp_session = _undo_db.start_undo_session();
auto processed_trx = _apply_transaction( trx );
_pending_tx.push_back(processed_trx);
notify_changed_objects();
// The transaction applied successfully. Merge its changes into the pending block session.
session.merge();
temp_session.merge();
// notify anyone listening to pending transactions
on_pending_transaction( trx );
@ -293,68 +296,77 @@ signed_block database::_generate_block(
if( !(skip & skip_witness_signature) )
FC_ASSERT( witness_obj.signing_key == block_signing_private_key.get_public_key() );
_pending_block.timestamp = when;
static const size_t max_block_header_size = fc::raw::pack_size( signed_block_header() ) + 4;
auto maximum_block_size = get_global_properties().parameters.maximum_block_size;
size_t total_block_size = max_block_header_size;
_pending_block.transaction_merkle_root = _pending_block.calculate_merkle_root();
signed_block pending_block;
_pending_block.witness = witness_id;
block_id_type head_id = head_block_id();
if( _pending_block.previous != head_id )
//
// The following code throws away existing pending_tx_session and
// rebuilds it by re-applying pending transactions.
//
// This rebuild is necessary because pending transactions' validity
// and semantics may have changed since they were received, because
// time-based semantics are evaluated based on the current block
// time. These changes can only be reflected in the database when
// the value of the "when" variable is known, which means we need to
// re-apply pending transactions in this method.
//
_pending_tx_session.reset();
_pending_tx_session = _undo_db.start_undo_session();
uint64_t postponed_tx_count = 0;
// pop pending state (reset to head block state)
for( const processed_transaction& tx : _pending_tx )
{
wlog( "_pending_block.previous was ${old}, setting to head_block_id ${new}", ("old", _pending_block.previous)("new", head_id) );
_pending_block.previous = head_id;
}
if( !(skip & skip_witness_signature) ) _pending_block.sign( block_signing_private_key );
size_t new_total_size = total_block_size + fc::raw::pack_size( tx );
FC_ASSERT( fc::raw::pack_size(_pending_block) <= get_global_properties().parameters.maximum_block_size );
signed_block tmp = _pending_block;
tmp.transaction_merkle_root = tmp.calculate_merkle_root();
_pending_block.transactions.clear();
bool failed = false;
try { push_block( tmp, skip ); }
catch ( const undo_database_exception& e ) { throw; }
catch ( const fc::exception& e )
{
if( !retry_on_failure )
// postpone transaction if it would make block too big
if( new_total_size >= maximum_block_size )
{
failed = true;
postponed_tx_count++;
continue;
}
else
try
{
wlog( "Reason for block production failure: ${e}", ("e",e) );
throw;
auto temp_session = _undo_db.start_undo_session();
processed_transaction ptx = _apply_transaction( tx );
temp_session.merge();
// We have to recompute pack_size(ptx) because it may be different
// than pack_size(tx) (i.e. if one or more results increased
// their size)
total_block_size += fc::raw::pack_size( ptx );
pending_block.transactions.push_back( ptx );
}
catch ( const fc::exception& e )
{
// Do nothing, transaction will not be re-applied
wlog( "Transaction was not processed while generating block due to ${e}", ("e", e) );
wlog( "The transaction was ${t}", ("t", tx) );
}
}
if( failed )
if( postponed_tx_count > 0 )
{
uint32_t failed_tx_count = 0;
for( const auto& trx : tmp.transactions )
{
try
{
push_transaction( trx, skip );
}
catch ( const fc::exception& e )
{
wlog( "Transaction is no longer valid: ${trx}", ("trx",trx) );
failed_tx_count++;
}
}
if( failed_tx_count == 0 )
{
//
// this is in generate_block() so this intensive logging
// (dumping a whole block) should be rate-limited
// to once per block production attempt
//
// TODO: Turn this off again once #261 is resolved.
//
wlog( "Block creation failed even though all tx's are still valid. Block: ${b}", ("b",tmp) );
}
return _generate_block( when, witness_id, block_signing_private_key, false );
wlog( "Postponed ${n} transactions due to block size limit", ("n", postponed_tx_count) );
}
return tmp;
_pending_tx_session.reset();
pending_block.previous = head_block_id();
pending_block.timestamp = when;
pending_block.transaction_merkle_root = pending_block.calculate_merkle_root();
pending_block.witness = witness_id;
if( !(skip & skip_witness_signature) )
pending_block.sign( block_signing_private_key );
FC_ASSERT( fc::raw::pack_size(pending_block) <= get_global_properties().parameters.maximum_block_size );
push_block( pending_block, skip );
return pending_block;
} FC_CAPTURE_AND_RETHROW( (witness_id) ) }
/**
@ -363,19 +375,23 @@ signed_block database::_generate_block(
*/
void database::pop_block()
{ try {
_pending_block_session.reset();
auto prev = _pending_block.previous;
_pending_tx_session.reset();
auto head_id = head_block_id();
optional<signed_block> head_block = fetch_block_by_id( head_id );
GRAPHENE_ASSERT( head_block.valid(), pop_empty_chain, "there are no blocks to pop" );
pop_undo();
_block_id_to_block.remove( prev );
_pending_block.previous = head_block_id();
_pending_block.timestamp = head_block_time();
_block_id_to_block.remove( head_id );
_fork_db.pop_block();
_popped_tx.insert( _popped_tx.begin(), head_block->transactions.begin(), head_block->transactions.end() );
} FC_CAPTURE_AND_RETHROW() }
void database::clear_pending()
{ try {
_pending_block.transactions.clear();
_pending_block_session.reset();
assert( (_pending_tx.size() == 0) || _pending_tx_session.valid() );
_pending_tx.clear();
_pending_tx_session.reset();
} FC_CAPTURE_AND_RETHROW() }
uint32_t database::push_applied_operation( const operation& op )
@ -451,11 +467,8 @@ void database::_apply_block( const signed_block& next_block )
update_global_dynamic_data(next_block);
update_signing_witness(signing_witness, next_block);
auto current_block_interval = global_props.parameters.block_interval;
// Are we at the maintenance interval?
if( maint_needed )
// This will update _pending_block.timestamp if the block interval has changed
perform_chain_maintenance(next_block, global_props);
create_block_summary(next_block);
@ -477,8 +490,6 @@ void database::_apply_block( const signed_block& next_block )
_applied_ops.clear();
notify_changed_objects();
update_pending_block(next_block, current_block_interval);
} FC_CAPTURE_AND_RETHROW( (next_block.block_num()) ) }
void database::notify_changed_objects()
@ -542,9 +553,11 @@ processed_transaction database::_apply_transaction(const signed_transaction& trx
FC_ASSERT( trx.ref_block_prefix == tapos_block_summary.block_id._hash[1] );
}
FC_ASSERT( trx.expiration <= _pending_block.timestamp + chain_parameters.maximum_time_until_expiration, "",
("trx.expiration",trx.expiration)("_pending_block.timestamp",_pending_block.timestamp)("max_til_exp",chain_parameters.maximum_time_until_expiration));
FC_ASSERT( _pending_block.timestamp <= trx.expiration, "", ("pending.timestamp",_pending_block.timestamp)("trx.exp",trx.expiration) );
fc::time_point_sec now = head_block_time();
FC_ASSERT( trx.expiration <= now + chain_parameters.maximum_time_until_expiration, "",
("trx.expiration",trx.expiration)("now",now)("max_til_exp",chain_parameters.maximum_time_until_expiration));
FC_ASSERT( now <= trx.expiration, "", ("now",now)("trx.exp",trx.expiration) );
}
//Insert transaction into unique transactions database.
@ -595,8 +608,8 @@ operation_result database::apply_operation(transaction_evaluation_state& eval_st
const witness_object& database::validate_block_header( uint32_t skip, const signed_block& next_block )const
{
FC_ASSERT( _pending_block.previous == next_block.previous, "", ("pending.prev",_pending_block.previous)("next.prev",next_block.previous) );
FC_ASSERT( _pending_block.timestamp <= next_block.timestamp, "", ("_pending_block.timestamp",_pending_block.timestamp)("next",next_block.timestamp)("blocknum",next_block.block_num()) );
FC_ASSERT( head_block_id() == next_block.previous, "", ("head_block_id",head_block_id())("next.prev",next_block.previous) );
FC_ASSERT( head_block_time() < next_block.timestamp, "", ("head_block_time",head_block_time())("next",next_block.timestamp)("blocknum",next_block.block_num()) );
const witness_object& witness = next_block.witness(*this);
if( !(skip&skip_witness_signature) )

View file

@ -103,7 +103,7 @@ void database::pay_workers( share_type& budget )
vector<std::reference_wrapper<const worker_object>> active_workers;
get_index_type<worker_index>().inspect_all_objects([this, &active_workers](const object& o) {
const worker_object& w = static_cast<const worker_object&>(o);
auto now = _pending_block.timestamp;
auto now = head_block_time();
if( w.is_active(now) && w.approving_stake(_vote_tally_buffer) > 0 )
active_workers.emplace_back(w);
});
@ -122,10 +122,10 @@ void database::pay_workers( share_type& budget )
{
const worker_object& active_worker = active_workers[i];
share_type requested_pay = active_worker.daily_pay;
if( _pending_block.timestamp - get_dynamic_global_properties().last_budget_time != fc::days(1) )
if( head_block_time() - get_dynamic_global_properties().last_budget_time != fc::days(1) )
{
fc::uint128 pay(requested_pay.value);
pay *= (_pending_block.timestamp - get_dynamic_global_properties().last_budget_time).count();
pay *= (head_block_time() - get_dynamic_global_properties().last_budget_time).count();
pay /= fc::days(1).count();
requested_pay = pay.to_uint64();
}
@ -322,7 +322,7 @@ void database::process_budget()
const dynamic_global_property_object& dpo = get_dynamic_global_properties();
const asset_dynamic_data_object& core =
asset_id_type(0)(*this).dynamic_asset_data_id(*this);
fc::time_point_sec now = _pending_block.timestamp;
fc::time_point_sec now = head_block_time();
int64_t time_to_maint = (dpo.next_maintenance_time - now).to_seconds();
//
@ -499,19 +499,6 @@ void database::perform_chain_maintenance(const signed_block& next_block, const g
}
});
auto new_block_interval = global_props.parameters.block_interval;
// if block interval CHANGED during this block *THEN* we cannot simply
// add the interval if we want to maintain the invariant that all timestamps are a multiple
// of the interval.
_pending_block.timestamp = next_block.timestamp + fc::seconds(new_block_interval);
uint32_t r = _pending_block.timestamp.sec_since_epoch()%new_block_interval;
if( !r )
{
_pending_block.timestamp -= r;
assert( (_pending_block.timestamp.sec_since_epoch() % new_block_interval) == 0 );
}
auto next_maintenance_time = get<dynamic_global_property_object>(dynamic_global_property_id_type()).next_maintenance_time;
auto maintenance_interval = gpo.parameters.maintenance_interval;

View file

@ -32,9 +32,9 @@ database::database()
initialize_evaluators();
}
database::~database(){
if( _pending_block_session )
_pending_block_session->commit();
database::~database()
{
clear_pending();
}
void database::reindex(fc::path data_dir, const genesis_state_type& initial_allocation)
@ -113,9 +113,6 @@ void database::open(
if( !find(global_property_id_type()) )
init_genesis(genesis_loader());
_pending_block.previous = head_block_id();
_pending_block.timestamp = head_block_time();
fc::optional<signed_block> last_block = _block_id_to_block.last();
if( last_block.valid() )
{
@ -133,7 +130,8 @@ void database::open(
void database::close(uint32_t blocks_to_rewind)
{
_pending_block_session.reset();
// TODO: Save pending tx's on close()
clear_pending();
// pop all of the blocks that we can given our undo history, this should
// throw when there is no more undo history to pop

View file

@ -97,12 +97,6 @@ void database::update_signing_witness(const witness_object& signing_witness, con
} );
}
void database::update_pending_block(const signed_block& next_block, uint8_t current_block_interval)
{
_pending_block.timestamp = next_block.timestamp + current_block_interval;
_pending_block.previous = next_block.id();
}
void database::clear_expired_transactions()
{
//Look for expired transactions in the deduplication list, and remove them.

View file

@ -186,6 +186,8 @@ vector<item_ptr> fork_database::fetch_block_by_number(uint32_t num)const
pair<fork_database::branch_type,fork_database::branch_type>
fork_database::fetch_branch_from(block_id_type first, block_id_type second)const
{ try {
// This function gets a branch (i.e. vector<fork_item>) leading
// back to the most recent common ancestor.
pair<branch_type,branch_type> result;
auto first_branch_itr = _index.get<block_id>().find(first);
FC_ASSERT(first_branch_itr != _index.get<block_id>().end());

View file

@ -379,6 +379,11 @@ namespace graphene { namespace chain {
*/
processed_transaction validate_transaction( const signed_transaction& trx );
/** when popping a block, the transactions that were removed get cached here so they
* can be reapplied at the proper time */
std::deque< signed_transaction > _popped_tx;
/**
* @}
*/
@ -388,7 +393,7 @@ namespace graphene { namespace chain {
void notify_changed_objects();
private:
optional<undo_database::session> _pending_block_session;
optional<undo_database::session> _pending_tx_session;
vector< unique_ptr<op_evaluator> > _operation_evaluators;
template<class Index>
@ -413,7 +418,6 @@ namespace graphene { namespace chain {
//////////////////// db_update.cpp ////////////////////
void update_global_dynamic_data( const signed_block& b );
void update_signing_witness(const witness_object& signing_witness, const signed_block& new_block);
void update_pending_block(const signed_block& next_block, uint8_t current_block_interval);
void clear_expired_transactions();
void clear_expired_proposals();
void clear_expired_orders();
@ -439,7 +443,7 @@ namespace graphene { namespace chain {
///@}
///@}
signed_block _pending_block;
vector< processed_transaction > _pending_tx;
fork_database _fork_db;
/**

View file

@ -56,6 +56,9 @@ struct skip_flags_restorer
/**
* Class used to help the without_pending_transactions
* implementation.
*
* TODO: Change the name of this class to better reflect the fact
* that it restores popped transactions as well as pending transactions.
*/
struct pending_transactions_restorer
{
@ -67,13 +70,27 @@ struct pending_transactions_restorer
~pending_transactions_restorer()
{
for( const auto& tx : _db._popped_tx )
{
try {
if( !_db.is_known_transaction( tx.id() ) ) {
// since push_transaction() takes a signed_transaction,
// the operation_results field will be ignored.
_db._push_transaction( tx );
}
} catch ( const fc::exception& ) {
}
}
_db._popped_tx.clear();
for( const processed_transaction& tx : _pending_transactions )
{
try
{
// since push_transaction() takes a signed_transaction,
// the operation_results field will be ignored.
_db.push_transaction( tx );
if( !_db.is_known_transaction( tx.id() ) ) {
// since push_transaction() takes a signed_transaction,
// the operation_results field will be ignored.
_db._push_transaction( tx );
}
}
catch( const fc::exception& e )
{

View file

@ -82,6 +82,8 @@ namespace graphene { namespace chain {
FC_DECLARE_DERIVED_EXCEPTION( invalid_pts_address, graphene::chain::utility_exception, 3060001, "invalid pts address" )
FC_DECLARE_DERIVED_EXCEPTION( insufficient_feeds, graphene::chain::chain_exception, 37006, "insufficient feeds" )
FC_DECLARE_DERIVED_EXCEPTION( pop_empty_chain, graphene::chain::undo_database_exception, 3070001, "there are no blocks to pop" )
GRAPHENE_DECLARE_OP_BASE_EXCEPTIONS( transfer );
GRAPHENE_DECLARE_OP_EVALUATE_EXCEPTION( from_account_not_whitelisted, transfer, 1, "owner mismatch" )
GRAPHENE_DECLARE_OP_EVALUATE_EXCEPTION( to_account_not_whitelisted, transfer, 2, "owner mismatch" )