Merge pull request #249 from elmato/no-spam-sub-function

No spam sub function
This commit is contained in:
Vikram Rajkumar 2017-03-23 09:56:23 -05:00 committed by GitHub
commit 8e0b16a0d5
8 changed files with 563 additions and 142 deletions

View file

@ -41,6 +41,8 @@
#define GET_REQUIRED_FEES_MAX_RECURSION 4
typedef std::map< std::pair<graphene::chain::asset_id_type, graphene::chain::asset_id_type>, std::vector<fc::variant> > market_queue_type;
namespace graphene { namespace app {
class database_api_impl;
@ -56,7 +58,7 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
fc::variants get_objects(const vector<object_id_type>& ids)const;
// Subscriptions
void set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter );
void set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create );
void set_pending_transaction_callback( std::function<void(const variant&)> cb );
void set_block_applied_callback( std::function<void(const variant& block_id)> cb );
void cancel_all_subscriptions();
@ -160,22 +162,52 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
{
if( !_subscribe_callback )
return false;
return true;
return _subscribe_filter.contains( i );
}
void broadcast_updates( const vector<variant>& updates );
bool is_impacted_account( const flat_set<account_id_type>& accounts)
{
if( !_subscribed_accounts.size() || !accounts.size() )
return false;
return std::any_of(accounts.begin(), accounts.end(), [this](const account_id_type& account) {
return _subscribed_accounts.find(account) != _subscribed_accounts.end();
});
}
template<typename T>
void enqueue_if_subscribed_to_market(const object* obj, market_queue_type& queue, bool full_object=true)
{
const T* order = dynamic_cast<const T*>(obj);
FC_ASSERT( order != nullptr);
auto market = order->get_market();
auto sub = _market_subscriptions.find( market );
if( sub != _market_subscriptions.end() ) {
queue[market].emplace_back( full_object ? obj->to_variant() : fc::variant(obj->id) );
}
}
void broadcast_updates( const vector<variant>& updates );
void broadcast_market_updates( const market_queue_type& queue);
void handle_object_changed(bool force_notify, bool full_object, const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts, std::function<const object*(object_id_type id)> find_object);
/** called every time a block is applied to report the objects that were changed */
void on_objects_changed(const vector<object_id_type>& ids);
void on_objects_removed(const vector<const object*>& objs);
void on_objects_new(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts);
void on_objects_changed(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts);
void on_objects_removed(const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts);
void on_applied_block();
mutable fc::bloom_filter _subscribe_filter;
bool _notify_remove_create = false;
mutable fc::bloom_filter _subscribe_filter;
std::set<account_id_type> _subscribed_accounts;
std::function<void(const fc::variant&)> _subscribe_callback;
std::function<void(const fc::variant&)> _pending_trx_callback;
std::function<void(const fc::variant&)> _block_applied_callback;
boost::signals2::scoped_connection _new_connection;
boost::signals2::scoped_connection _change_connection;
boost::signals2::scoped_connection _removed_connection;
boost::signals2::scoped_connection _applied_block_connection;
@ -198,11 +230,14 @@ database_api::~database_api() {}
database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db)
{
wlog("creating database api ${x}", ("x",int64_t(this)) );
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) {
on_objects_changed(ids);
_new_connection = _db.new_objects.connect([this](const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts) {
on_objects_new(ids, impacted_accounts);
});
_removed_connection = _db.removed_objects.connect([this](const vector<const object*>& objs) {
on_objects_removed(objs);
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts) {
on_objects_changed(ids, impacted_accounts);
});
_removed_connection = _db.removed_objects.connect([this](const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts) {
on_objects_removed(ids, objs, impacted_accounts);
});
_applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); });
@ -258,24 +293,24 @@ fc::variants database_api_impl::get_objects(const vector<object_id_type>& ids)co
// //
//////////////////////////////////////////////////////////////////////
void database_api::set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter )
void database_api::set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create )
{
my->set_subscribe_callback( cb, clear_filter );
my->set_subscribe_callback( cb, notify_remove_create );
}
void database_api_impl::set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter )
void database_api_impl::set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create )
{
edump((clear_filter));
//edump((clear_filter));
_subscribe_callback = cb;
if( clear_filter || !cb )
{
static fc::bloom_parameters param;
param.projected_element_count = 10000;
param.false_positive_probability = 1.0/10000;
param.maximum_size = 1024*8*8*2;
param.compute_optimal_parameters();
_subscribe_filter = fc::bloom_filter(param);
}
_notify_remove_create = notify_remove_create;
_subscribed_accounts.clear();
static fc::bloom_parameters param;
param.projected_element_count = 10000;
param.false_positive_probability = 1.0/100;
param.maximum_size = 1024*8*8*2;
param.compute_optimal_parameters();
_subscribe_filter = fc::bloom_filter(param);
}
void database_api::set_pending_transaction_callback( std::function<void(const variant&)> cb )
@ -568,7 +603,8 @@ std::map<std::string, full_account> database_api_impl::get_full_accounts( const
if( subscribe )
{
ilog( "subscribe to ${id}", ("id",account->name) );
FC_ASSERT( std::distance(_subscribed_accounts.begin(), _subscribed_accounts.end()) < 100 );
_subscribed_accounts.insert( account->get_id() );
subscribe_to_item( account->id );
}
@ -630,7 +666,7 @@ std::map<std::string, full_account> database_api_impl::get_full_accounts( const
[&acnt] (const call_order_object& call) {
acnt.call_orders.emplace_back(call);
});
// get assets issued by user
auto asset_range = _db.get_index_type<asset_index>().indices().get<by_issuer>().equal_range(account->id);
std::for_each(asset_range.first, asset_range.second,
@ -1802,110 +1838,109 @@ vector<blinded_balance_object> database_api_impl::get_blinded_balances( const fl
void database_api_impl::broadcast_updates( const vector<variant>& updates )
{
if( updates.size() ) {
if( updates.size() && _subscribe_callback ) {
auto capture_this = shared_from_this();
fc::async([capture_this,updates](){
capture_this->_subscribe_callback( fc::variant(updates) );
if(capture_this->_subscribe_callback)
capture_this->_subscribe_callback( fc::variant(updates) );
});
}
}
void database_api_impl::on_objects_removed( const vector<const object*>& objs )
void database_api_impl::broadcast_market_updates( const market_queue_type& queue)
{
if( queue.size() )
{
auto capture_this = shared_from_this();
fc::async([capture_this, this, queue](){
for( const auto& item : queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
});
}
}
void database_api_impl::on_objects_removed( const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts)
{
handle_object_changed(_notify_remove_create, false, ids, impacted_accounts,
[objs](object_id_type id) -> const object* {
auto it = std::find_if(
objs.begin(), objs.end(),
[id](const object* o) {return o != nullptr && o->id == id;});
if (it != objs.end())
return *it;
return nullptr;
}
);
}
void database_api_impl::on_objects_new(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts)
{
handle_object_changed(_notify_remove_create, true, ids, impacted_accounts,
std::bind(&object_database::find_object, &_db, std::placeholders::_1)
);
}
void database_api_impl::on_objects_changed(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts)
{
handle_object_changed(false, true, ids, impacted_accounts,
std::bind(&object_database::find_object, &_db, std::placeholders::_1)
);
}
void database_api_impl::handle_object_changed(bool force_notify, bool full_object, const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts, std::function<const object*(object_id_type id)> find_object)
{
/// we need to ensure the database_api is not deleted for the life of the async operation
if( _subscribe_callback )
{
vector<variant> updates;
updates.reserve(objs.size());
vector<variant> updates;
for(auto id : ids)
{
if( force_notify || is_subscribed_to_item(id) || is_impacted_account(impacted_accounts) )
{
if( full_object )
{
auto obj = find_object(id);
if( obj )
{
updates.emplace_back( obj->to_variant() );
}
}
else
{
updates.emplace_back( id );
}
}
}
for( auto obj : objs )
updates.emplace_back( obj->id );
broadcast_updates( updates );
broadcast_updates(updates);
}
if( _market_subscriptions.size() )
{
map< pair<asset_id_type, asset_id_type>, vector<variant> > broadcast_queue;
for( const auto& obj : objs )
market_queue_type broadcast_queue;
for(auto id : ids)
{
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
if( order )
if( id.is<call_order_object>() )
{
auto sub = _market_subscriptions.find( order->get_market() );
if( sub != _market_subscriptions.end() )
broadcast_queue[order->get_market()].emplace_back( order->id );
enqueue_if_subscribed_to_market<call_order_object>( find_object(id), broadcast_queue, full_object );
}
else if( id.is<limit_order_object>() )
{
enqueue_if_subscribed_to_market<limit_order_object>( find_object(id), broadcast_queue, full_object );
}
}
if( broadcast_queue.size() )
{
auto capture_this = shared_from_this();
fc::async([capture_this,this,broadcast_queue](){
for( const auto& item : broadcast_queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
});
}
broadcast_market_updates(broadcast_queue);
}
}
void database_api_impl::on_objects_changed(const vector<object_id_type>& ids)
{
vector<variant> updates;
map< pair<asset_id_type, asset_id_type>, vector<variant> > market_broadcast_queue;
for(auto id : ids)
{
const object* obj = nullptr;
if( _subscribe_callback )
{
obj = _db.find_object( id );
if( obj )
{
updates.emplace_back( obj->to_variant() );
}
else
{
updates.emplace_back(id); // send just the id to indicate removal
}
}
if( _market_subscriptions.size() )
{
if( !_subscribe_callback )
obj = _db.find_object( id );
if( obj )
{
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
if( order )
{
auto sub = _market_subscriptions.find( order->get_market() );
if( sub != _market_subscriptions.end() )
market_broadcast_queue[order->get_market()].emplace_back( order->id );
}
}
}
}
auto capture_this = shared_from_this();
/// pushing the future back / popping the prior future if it is complete.
/// if a connection hangs then this could get backed up and result in
/// a failure to exit cleanly.
fc::async([capture_this,this,updates,market_broadcast_queue](){
if( _subscribe_callback ) _subscribe_callback( updates );
for( const auto& item : market_broadcast_queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
});
}
/** note: this method cannot yield because it is called in the middle of
* apply a block.
*/

View file

@ -32,3 +32,4 @@
#include "db_market.cpp"
#include "db_update.cpp"
#include "db_witness_schedule.cpp"
#include "db_notify.cpp"

View file

@ -29,6 +29,7 @@
#include <graphene/chain/block_summary_object.hpp>
#include <graphene/chain/global_property_object.hpp>
#include <graphene/chain/operation_history_object.hpp>
#include <graphene/chain/proposal_object.hpp>
#include <graphene/chain/transaction_object.hpp>
#include <graphene/chain/witness_object.hpp>
@ -241,7 +242,7 @@ processed_transaction database::_push_transaction( const signed_transaction& trx
auto processed_trx = _apply_transaction( trx );
_pending_tx.push_back(processed_trx);
notify_changed_objects();
// notify_changed_objects();
// The transaction applied successfully. Merge its changes into the pending block session.
temp_session.merge();
@ -544,24 +545,7 @@ void database::_apply_block( const signed_block& next_block )
notify_changed_objects();
} FC_CAPTURE_AND_RETHROW( (next_block.block_num()) ) }
void database::notify_changed_objects()
{ try {
if( _undo_db.enabled() )
{
const auto& head_undo = _undo_db.head();
vector<object_id_type> changed_ids; changed_ids.reserve(head_undo.old_values.size());
for( const auto& item : head_undo.old_values ) changed_ids.push_back(item.first);
for( const auto& item : head_undo.new_ids ) changed_ids.push_back(item);
vector<const object*> removed;
removed.reserve( head_undo.removed.size() );
for( const auto& item : head_undo.removed )
{
changed_ids.push_back( item.first );
removed.emplace_back( item.second.get() );
}
changed_objects(changed_ids);
}
} FC_CAPTURE_AND_RETHROW() }
processed_transaction database::apply_transaction(const signed_transaction& trx, uint32_t skip)
{

View file

@ -0,0 +1,392 @@
#include <fc/container/flat.hpp>
#include <graphene/chain/protocol/authority.hpp>
#include <graphene/chain/protocol/operations.hpp>
#include <graphene/chain/protocol/transaction.hpp>
#include <graphene/chain/protocol/types.hpp>
#include <graphene/chain/withdraw_permission_object.hpp>
#include <graphene/chain/worker_object.hpp>
#include <graphene/chain/confidential_object.hpp>
#include <graphene/chain/market_object.hpp>
#include <graphene/chain/committee_member_object.hpp>
using namespace fc;
using namespace graphene::chain;
// TODO: Review all of these, especially no-ops
struct get_impacted_account_visitor
{
flat_set<account_id_type>& _impacted;
get_impacted_account_visitor( flat_set<account_id_type>& impact ):_impacted(impact) {}
typedef void result_type;
void operator()( const transfer_operation& op )
{
_impacted.insert( op.to );
}
void operator()( const asset_claim_fees_operation& op ){}
void operator()( const limit_order_create_operation& op ) {}
void operator()( const limit_order_cancel_operation& op )
{
_impacted.insert( op.fee_paying_account );
}
void operator()( const call_order_update_operation& op ) {}
void operator()( const fill_order_operation& op )
{
_impacted.insert( op.account_id );
}
void operator()( const account_create_operation& op )
{
_impacted.insert( op.registrar );
_impacted.insert( op.referrer );
add_authority_accounts( _impacted, op.owner );
add_authority_accounts( _impacted, op.active );
}
void operator()( const account_update_operation& op )
{
_impacted.insert( op.account );
if( op.owner )
add_authority_accounts( _impacted, *(op.owner) );
if( op.active )
add_authority_accounts( _impacted, *(op.active) );
}
void operator()( const account_whitelist_operation& op )
{
_impacted.insert( op.account_to_list );
}
void operator()( const account_upgrade_operation& op ) {}
void operator()( const account_transfer_operation& op )
{
_impacted.insert( op.new_owner );
}
void operator()( const asset_create_operation& op ) {}
void operator()( const asset_update_operation& op )
{
if( op.new_issuer )
_impacted.insert( *(op.new_issuer) );
}
void operator()( const asset_update_bitasset_operation& op ) {}
void operator()( const asset_update_feed_producers_operation& op ) {}
void operator()( const asset_issue_operation& op )
{
_impacted.insert( op.issue_to_account );
}
void operator()( const asset_reserve_operation& op ) {}
void operator()( const asset_fund_fee_pool_operation& op ) {}
void operator()( const asset_settle_operation& op ) {}
void operator()( const asset_global_settle_operation& op ) {}
void operator()( const asset_publish_feed_operation& op ) {}
void operator()( const witness_create_operation& op )
{
_impacted.insert( op.witness_account );
}
void operator()( const witness_update_operation& op )
{
_impacted.insert( op.witness_account );
}
void operator()( const proposal_create_operation& op )
{
vector<authority> other;
for( const auto& proposed_op : op.proposed_ops )
operation_get_required_authorities( proposed_op.op, _impacted, _impacted, other );
for( auto& o : other )
add_authority_accounts( _impacted, o );
}
void operator()( const proposal_update_operation& op ) {}
void operator()( const proposal_delete_operation& op ) {}
void operator()( const withdraw_permission_create_operation& op )
{
_impacted.insert( op.authorized_account );
}
void operator()( const withdraw_permission_update_operation& op )
{
_impacted.insert( op.authorized_account );
}
void operator()( const withdraw_permission_claim_operation& op )
{
_impacted.insert( op.withdraw_from_account );
}
void operator()( const withdraw_permission_delete_operation& op )
{
_impacted.insert( op.authorized_account );
}
void operator()( const committee_member_create_operation& op )
{
_impacted.insert( op.committee_member_account );
}
void operator()( const committee_member_update_operation& op )
{
_impacted.insert( op.committee_member_account );
}
void operator()( const committee_member_update_global_parameters_operation& op ) {}
void operator()( const vesting_balance_create_operation& op )
{
_impacted.insert( op.owner );
}
void operator()( const vesting_balance_withdraw_operation& op ) {}
void operator()( const worker_create_operation& op ) {}
void operator()( const custom_operation& op ) {}
void operator()( const assert_operation& op ) {}
void operator()( const balance_claim_operation& op ) {}
void operator()( const override_transfer_operation& op )
{
_impacted.insert( op.to );
_impacted.insert( op.from );
_impacted.insert( op.issuer );
}
void operator()( const transfer_to_blind_operation& op )
{
_impacted.insert( op.from );
for( const auto& out : op.outputs )
add_authority_accounts( _impacted, out.owner );
}
void operator()( const blind_transfer_operation& op )
{
for( const auto& in : op.inputs )
add_authority_accounts( _impacted, in.owner );
for( const auto& out : op.outputs )
add_authority_accounts( _impacted, out.owner );
}
void operator()( const transfer_from_blind_operation& op )
{
_impacted.insert( op.to );
for( const auto& in : op.inputs )
add_authority_accounts( _impacted, in.owner );
}
void operator()( const asset_settle_cancel_operation& op )
{
_impacted.insert( op.account );
}
void operator()( const fba_distribute_operation& op )
{
_impacted.insert( op.account_id );
}
};
void operation_get_impacted_accounts( const operation& op, flat_set<account_id_type>& result )
{
get_impacted_account_visitor vtor = get_impacted_account_visitor( result );
op.visit( vtor );
}
void transaction_get_impacted_accounts( const transaction& tx, flat_set<account_id_type>& result )
{
for( const auto& op : tx.operations )
operation_get_impacted_accounts( op, result );
}
void get_relevant_accounts( const object* obj, flat_set<account_id_type>& accounts )
{
if( obj->id.space() == protocol_ids )
{
switch( (object_type)obj->id.type() )
{
case null_object_type:
case base_object_type:
case OBJECT_TYPE_COUNT:
return;
case account_object_type:{
accounts.insert( obj->id );
break;
} case asset_object_type:{
const auto& aobj = dynamic_cast<const asset_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->issuer );
break;
} case force_settlement_object_type:{
const auto& aobj = dynamic_cast<const force_settlement_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->owner );
break;
} case committee_member_object_type:{
const auto& aobj = dynamic_cast<const committee_member_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->committee_member_account );
break;
} case witness_object_type:{
const auto& aobj = dynamic_cast<const witness_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->witness_account );
break;
} case limit_order_object_type:{
const auto& aobj = dynamic_cast<const limit_order_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->seller );
break;
} case call_order_object_type:{
const auto& aobj = dynamic_cast<const call_order_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->borrower );
break;
} case custom_object_type:{
break;
} case proposal_object_type:{
const auto& aobj = dynamic_cast<const proposal_object*>(obj);
assert( aobj != nullptr );
transaction_get_impacted_accounts( aobj->proposed_transaction, accounts );
break;
} case operation_history_object_type:{
const auto& aobj = dynamic_cast<const operation_history_object*>(obj);
assert( aobj != nullptr );
operation_get_impacted_accounts( aobj->op, accounts );
break;
} case withdraw_permission_object_type:{
const auto& aobj = dynamic_cast<const withdraw_permission_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->withdraw_from_account );
accounts.insert( aobj->authorized_account );
break;
} case vesting_balance_object_type:{
const auto& aobj = dynamic_cast<const vesting_balance_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->owner );
break;
} case worker_object_type:{
const auto& aobj = dynamic_cast<const worker_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->worker_account );
break;
} case balance_object_type:{
/** these are free from any accounts */
break;
}
}
}
else if( obj->id.space() == implementation_ids )
{
switch( (impl_object_type)obj->id.type() )
{
case impl_global_property_object_type:
break;
case impl_dynamic_global_property_object_type:
break;
case impl_reserved0_object_type:
break;
case impl_asset_dynamic_data_type:
break;
case impl_asset_bitasset_data_type:
break;
case impl_account_balance_object_type:{
const auto& aobj = dynamic_cast<const account_balance_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->owner );
break;
} case impl_account_statistics_object_type:{
const auto& aobj = dynamic_cast<const account_statistics_object*>(obj);
assert( aobj != nullptr );
accounts.insert( aobj->owner );
break;
} case impl_transaction_object_type:{
const auto& aobj = dynamic_cast<const transaction_object*>(obj);
assert( aobj != nullptr );
transaction_get_impacted_accounts( aobj->trx, accounts );
break;
} case impl_blinded_balance_object_type:{
const auto& aobj = dynamic_cast<const blinded_balance_object*>(obj);
assert( aobj != nullptr );
for( const auto& a : aobj->owner.account_auths )
accounts.insert( a.first );
break;
} case impl_block_summary_object_type:
break;
case impl_account_transaction_history_object_type:
break;
case impl_chain_property_object_type:
break;
case impl_witness_schedule_object_type:
break;
case impl_budget_record_object_type:
break;
case impl_special_authority_object_type:
break;
case impl_buyback_object_type:
break;
case impl_fba_accumulator_object_type:
break;
}
}
} // end get_relevant_accounts( const object* obj, flat_set<account_id_type>& accounts )
namespace graphene { namespace chain {
void database::notify_changed_objects()
{ try {
if( _undo_db.enabled() )
{
const auto& head_undo = _undo_db.head();
// New
if( !new_objects.empty() )
{
vector<object_id_type> new_ids; new_ids.reserve(head_undo.new_ids.size());
flat_set<account_id_type> new_accounts_impacted;
for( const auto& item : head_undo.new_ids )
{
new_ids.push_back(item);
auto obj = find_object(item);
if(obj != nullptr)
get_relevant_accounts(obj, new_accounts_impacted);
}
new_objects(new_ids, new_accounts_impacted);
}
// Changed
if( !changed_objects.empty() )
{
vector<object_id_type> changed_ids; changed_ids.reserve(head_undo.old_values.size());
flat_set<account_id_type> changed_accounts_impacted;
for( const auto& item : head_undo.old_values )
{
changed_ids.push_back(item.first);
get_relevant_accounts(item.second.get(), changed_accounts_impacted);
}
changed_objects(changed_ids, changed_accounts_impacted);
}
// Removed
if( !removed_objects.empty() )
{
vector<object_id_type> removed_ids; removed_ids.reserve( head_undo.removed.size() );
vector<const object*> removed; removed.reserve( head_undo.removed.size() );
flat_set<account_id_type> removed_accounts_impacted;
for( const auto& item : head_undo.removed )
{
removed_ids.emplace_back( item.first );
auto obj = item.second.get();
removed.emplace_back( obj );
get_relevant_accounts(obj, removed_accounts_impacted);
}
removed_objects(removed_ids, removed, removed_accounts_impacted);
}
}
} FC_CAPTURE_AND_LOG( () ) }
} }

View file

@ -193,12 +193,18 @@ namespace graphene { namespace chain {
* Emitted After a block has been applied and committed. The callback
* should not yield and should execute quickly.
*/
fc::signal<void(const vector<object_id_type>&)> changed_objects;
fc::signal<void(const vector<object_id_type>&, const flat_set<account_id_type>&)> new_objects;
/**
* Emitted After a block has been applied and committed. The callback
* should not yield and should execute quickly.
*/
fc::signal<void(const vector<object_id_type>&, const flat_set<account_id_type>&)> changed_objects;
/** this signal is emitted any time an object is removed and contains a
* pointer to the last value of every object that was removed.
*/
fc::signal<void(const vector<const object*>&)> removed_objects;
fc::signal<void(const vector<object_id_type>&, const vector<const object*>&, const flat_set<account_id_type>&)> removed_objects;
//////////////////// db_witness_schedule.cpp ////////////////////

View file

@ -120,6 +120,13 @@ class call_order_object : public abstract_object<call_order_object>
share_type collateral; ///< call_price.base.asset_id, access via get_collateral
share_type debt; ///< call_price.quote.asset_id, access via get_collateral
price call_price; ///< Debt / Collateral
pair<asset_id_type,asset_id_type> get_market()const
{
auto tmp = std::make_pair( call_price.base.asset_id, call_price.quote.asset_id );
if( tmp.first > tmp.second ) std::swap( tmp.first, tmp.second );
return tmp;
}
};
/**

View file

@ -98,13 +98,13 @@ void debug_witness_plugin::plugin_startup()
// connect needed signals
_applied_block_conn = db.applied_block.connect([this](const graphene::chain::signed_block& b){ on_applied_block(b); });
_changed_objects_conn = db.changed_objects.connect([this](const std::vector<graphene::db::object_id_type>& ids){ on_changed_objects(ids); });
_removed_objects_conn = db.removed_objects.connect([this](const std::vector<const graphene::db::object*>& objs){ on_removed_objects(objs); });
_changed_objects_conn = db.changed_objects.connect([this](const std::vector<graphene::db::object_id_type>& ids, const fc::flat_set<graphene::chain::account_id_type>& impacted_accounts){ on_changed_objects(ids, impacted_accounts); });
_removed_objects_conn = db.removed_objects.connect([this](const std::vector<graphene::db::object_id_type>& ids, const std::vector<const graphene::db::object*>& objs, const fc::flat_set<graphene::chain::account_id_type>& impacted_accounts){ on_removed_objects(ids, objs, impacted_accounts); });
return;
}
void debug_witness_plugin::on_changed_objects( const std::vector<graphene::db::object_id_type>& ids )
void debug_witness_plugin::on_changed_objects( const std::vector<graphene::db::object_id_type>& ids, const fc::flat_set<graphene::chain::account_id_type>& impacted_accounts )
{
if( _json_object_stream && (ids.size() > 0) )
{
@ -112,11 +112,7 @@ void debug_witness_plugin::on_changed_objects( const std::vector<graphene::db::o
for( const graphene::db::object_id_type& oid : ids )
{
const graphene::db::object* obj = db.find_object( oid );
if( obj == nullptr )
{
(*_json_object_stream) << "{\"id\":" << fc::json::to_string( oid ) << "}\n";
}
else
if( obj != nullptr )
{
(*_json_object_stream) << fc::json::to_string( obj->to_variant() ) << '\n';
}
@ -124,9 +120,8 @@ void debug_witness_plugin::on_changed_objects( const std::vector<graphene::db::o
}
}
void debug_witness_plugin::on_removed_objects( const std::vector<const graphene::db::object*> objs )
void debug_witness_plugin::on_removed_objects( const std::vector<graphene::db::object_id_type>& ids, const std::vector<const graphene::db::object*> objs, const fc::flat_set<graphene::chain::account_id_type>& impacted_accounts )
{
/*
if( _json_object_stream )
{
for( const graphene::db::object* obj : objs )
@ -134,7 +129,6 @@ void debug_witness_plugin::on_removed_objects( const std::vector<const graphene:
(*_json_object_stream) << "{\"id\":" << fc::json::to_string( obj->id ) << "}\n";
}
}
*/
}
void debug_witness_plugin::on_applied_block( const graphene::chain::signed_block& b )

View file

@ -25,8 +25,10 @@
#include <graphene/app/plugin.hpp>
#include <graphene/chain/database.hpp>
#include <graphene/chain/protocol/types.hpp>
#include <fc/thread/future.hpp>
#include <fc/container/flat.hpp>
namespace graphene { namespace debug_witness_plugin {
@ -50,8 +52,8 @@ public:
private:
void on_changed_objects( const std::vector<graphene::db::object_id_type>& ids );
void on_removed_objects( const std::vector<const graphene::db::object*> objs );
void on_changed_objects( const std::vector<graphene::db::object_id_type>& ids, const fc::flat_set<graphene::chain::account_id_type>& impacted_accounts );
void on_removed_objects( const std::vector<graphene::db::object_id_type>& ids, const std::vector<const graphene::db::object*> objs, const fc::flat_set<graphene::chain::account_id_type>& impacted_accounts );
void on_applied_block( const graphene::chain::signed_block& b );
boost::program_options::variables_map _options;