rollback on save

This commit is contained in:
Daniel Larimer 2015-09-03 17:42:52 -04:00
parent 86bb4cdbca
commit a5071f2568
5 changed files with 51 additions and 66 deletions

View file

@ -228,6 +228,7 @@ namespace graphene { namespace app {
std::map<std::string, full_account> database_api::get_full_accounts( const vector<std::string>& names_or_ids, bool subscribe)
{
idump((names_or_ids));
std::map<std::string, full_account> results;
for (const std::string& account_name_or_id : names_or_ids)
@ -801,35 +802,27 @@ namespace graphene { namespace app {
} // end get_relevant_accounts( obj )
void database_api::broadcast_updates( const vector<variant>& updates )
{
if( updates.size() ) {
auto capture_this = shared_from_this();
fc::async([capture_this,updates](){
capture_this->_subscribe_callback( fc::variant(updates) );
});
}
}
void database_api::on_objects_removed( const vector<const object*>& objs )
{
/// we need to ensure the database_api is not deleted for the life of the async operation
auto capture_this = shared_from_this();
if( _subscribe_callback )
{
map<account_id_type, vector<variant> > broadcast_queue;
for( const auto& obj : objs )
{
auto relevant = get_relevant_accounts( obj );
for( const auto& r : relevant )
{
if( _subscribe_filter.contains(r) )
{
broadcast_queue[r].emplace_back(obj->to_variant());
break;
}
}
if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) )
broadcast_queue[account_id_type()].emplace_back(obj->to_variant());
}
vector<variant> updates;
updates.reserve(objs.size());
if( broadcast_queue.size() )
{
fc::async([capture_this,broadcast_queue,this](){
_subscribe_callback( fc::variant(broadcast_queue) );
});
}
for( auto obj : objs )
updates.emplace_back( obj->id );
broadcast_updates( updates );
}
if( _market_subscriptions.size() )
@ -847,6 +840,7 @@ namespace graphene { namespace app {
}
if( broadcast_queue.size() )
{
auto capture_this = shared_from_this();
fc::async([capture_this,this,broadcast_queue](){
for( const auto& item : broadcast_queue )
{
@ -864,7 +858,6 @@ namespace graphene { namespace app {
vector<variant> updates;
map< pair<asset_id_type, asset_id_type>, vector<variant> > market_broadcast_queue;
idump((ids));
for(auto id : ids)
{
const object* obj = nullptr;
@ -873,47 +866,15 @@ namespace graphene { namespace app {
obj = _db.find_object( id );
if( obj )
{
auto acnt = dynamic_cast<const account_object*>(obj);
if( acnt )
{
bool added_account = false;
for( const auto& key : acnt->owner.key_auths )
if( is_subscribed_to_item( key.first ) )
{
updates.emplace_back( obj->to_variant() );
added_account = true;
break;
}
for( const auto& key : acnt->active.key_auths )
if( is_subscribed_to_item( key.first ) )
{
updates.emplace_back( obj->to_variant() );
added_account = true;
break;
}
if( added_account )
continue;
}
vector<account_id_type> relevant = get_relevant_accounts( obj );
for( const auto& r : relevant )
{
if( _subscribe_filter.contains(r) )
{
updates.emplace_back(obj->to_variant());
break;
}
}
if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) )
updates.emplace_back(obj->to_variant());
updates.emplace_back( obj->to_variant() );
}
else
{
if( _subscribe_filter.contains(id) )
updates.emplace_back(id); // send just the id to indicate removal
updates.emplace_back(id); // send just the id to indicate removal
}
}
/*
if( _market_subscriptions.size() )
{
if( !_subscribe_callback )
@ -929,6 +890,7 @@ namespace graphene { namespace app {
}
}
}
*/
}
auto capture_this = shared_from_this();
@ -1223,14 +1185,18 @@ namespace graphene { namespace app {
set<public_key_type> database_api::get_required_signatures( const signed_transaction& trx, const flat_set<public_key_type>& available_keys )const
{
return trx.get_required_signatures( _db.get_chain_id(),
wdump((trx)(available_keys));
auto result = trx.get_required_signatures( _db.get_chain_id(),
available_keys,
[&]( account_id_type id ){ return &id(_db).active; },
[&]( account_id_type id ){ return &id(_db).owner; },
_db.get_global_properties().parameters.max_authority_depth );
wdump((result));
return result;
}
set<public_key_type> database_api::get_potential_signatures( const signed_transaction& trx )const
{
wdump((trx));
set<public_key_type> result;
trx.get_required_signatures( _db.get_chain_id(),
flat_set<public_key_type>(),
@ -1247,6 +1213,7 @@ namespace graphene { namespace app {
},
_db.get_global_properties().parameters.max_authority_depth );
wdump((result));
return result;
}

View file

@ -360,16 +360,24 @@ namespace graphene { namespace app {
template<typename T>
void subscribe_to_item( const T& i )const
{
auto vec = fc::raw::pack(i);
if( !_subscribe_callback ) return;
_subscribe_filter.insert( (const char*)&i, sizeof(i) );
if( !is_subscribed_to_item(i) )
{
idump((i));
_subscribe_filter.insert( vec.data(), vec.size() );//(vecconst char*)&i, sizeof(i) );
}
}
template<typename T>
bool is_subscribed_to_item( const T& i )const
{
if( !_subscribe_callback ) return false;
return true;
return _subscribe_filter.contains( i );
}
void broadcast_updates( const vector<variant>& updates );
/** called every time a block is applied to report the objects that were changed */
void on_objects_changed(const vector<object_id_type>& ids);

View file

@ -212,9 +212,11 @@ optional<signed_block> block_database::last()const
_block_num_to_pos.seekg( -sizeof(index_entry), _block_num_to_pos.end );
_block_num_to_pos.read( (char*)&e, sizeof(e) );
while( e.block_size == 0 && _blocks.tellg() > 0 )
uint64_t pos = _block_num_to_pos.tellg();
while( e.block_size == 0 && pos > 0 )
{
_block_num_to_pos.seekg( -sizeof(index_entry), _block_num_to_pos.cur );
pos -= sizeof(index_entry);
_block_num_to_pos.seekg( pos );
_block_num_to_pos.read( (char*)&e, sizeof(e) );
}

View file

@ -331,8 +331,9 @@ signed_block database::_generate_block(
void database::pop_block()
{ try {
_pending_block_session.reset();
_block_id_to_block.remove( _pending_block.previous );
auto prev = _pending_block.previous;
pop_undo();
_block_id_to_block.remove( prev );
_pending_block.previous = head_block_id();
_pending_block.timestamp = head_block_time();
_fork_db.pop_block();

View file

@ -45,12 +45,15 @@ void database::reindex(fc::path data_dir, const genesis_state_type& initial_allo
auto start = fc::time_point::now();
auto last_block = _block_id_to_block.last();
if( !last_block ) return;
if( !last_block ) {
elog( "!no last block" );
edump((last_block));
return;
}
const auto last_block_num = last_block->block_num();
ilog( "Replaying blocks..." );
// TODO: disable undo tracking during reindex, this currently causes crashes in the benchmark test
_undo_db.disable();
for( uint32_t i = 1; i <= last_block_num; ++i )
{
@ -107,6 +110,10 @@ void database::close(uint32_t blocks_to_rewind)
for(uint32_t i = 0; i < blocks_to_rewind && head_block_num() > 0; ++i)
pop_block();
// pop all of the blocks that we can given our undo history, this should
// throw when there is no more undo history to pop
try { while( true ) { elog("pop"); pop_block(); } } catch (...){}
object_database::flush();
object_database::close();