Keep track of subscribed accounts and check for events that reference them

This commit is contained in:
elmato 2017-03-08 01:50:01 +00:00
parent 6d3fbeef55
commit aab5fddb2f

View file

@ -166,6 +166,16 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
return _subscribe_filter.contains( i ); return _subscribe_filter.contains( i );
} }
bool is_impacted_account( const flat_set<account_id_type>& accounts)
{
if( !_subscribed_accounts.size() || !accounts.size() )
return false;
return std::any_of(accounts.begin(), accounts.end(), [this](const account_id_type& account) {
return _subscribed_accounts.find(account) != _subscribed_accounts.end();
});
}
template<typename T> template<typename T>
void enqueue_if_subscribed_to_market(const object* obj, market_queue_type& queue, bool full_object=true) void enqueue_if_subscribed_to_market(const object* obj, market_queue_type& queue, bool full_object=true)
{ {
@ -182,16 +192,17 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
void broadcast_updates( const vector<variant>& updates ); void broadcast_updates( const vector<variant>& updates );
void broadcast_market_updates( const market_queue_type& queue); void broadcast_market_updates( const market_queue_type& queue);
void handle_object_changed(bool force_notify, const vector<object_id_type>& ids, std::function<const object*(object_id_type id)> find_object); void handle_object_changed(bool force_notify, bool full_object, const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts, std::function<const object*(object_id_type id)> find_object);
/** called every time a block is applied to report the objects that were changed */ /** called every time a block is applied to report the objects that were changed */
void on_objects_new(const vector<object_id_type>& ids); void on_objects_new(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts);
void on_objects_changed(const vector<object_id_type>& ids); void on_objects_changed(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts);
void on_objects_removed(const vector<object_id_type>& ids, const vector<const object*>& objs); void on_objects_removed(const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts);
void on_applied_block(); void on_applied_block();
bool _notify_remove_create = false; bool _notify_remove_create = false;
mutable fc::bloom_filter _subscribe_filter; mutable fc::bloom_filter _subscribe_filter;
std::set<account_id_type> _subscribed_accounts;
std::function<void(const fc::variant&)> _subscribe_callback; std::function<void(const fc::variant&)> _subscribe_callback;
std::function<void(const fc::variant&)> _pending_trx_callback; std::function<void(const fc::variant&)> _pending_trx_callback;
std::function<void(const fc::variant&)> _block_applied_callback; std::function<void(const fc::variant&)> _block_applied_callback;
@ -219,14 +230,14 @@ database_api::~database_api() {}
database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db) database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db)
{ {
wlog("creating database api ${x}", ("x",int64_t(this)) ); wlog("creating database api ${x}", ("x",int64_t(this)) );
_new_connection = _db.new_objects.connect([this](const vector<object_id_type>& ids) { _new_connection = _db.new_objects.connect([this](const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts) {
on_objects_new(ids); on_objects_new(ids, impacted_accounts);
}); });
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) { _change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts) {
on_objects_changed(ids); on_objects_changed(ids, impacted_accounts);
}); });
_removed_connection = _db.removed_objects.connect([this](const vector<object_id_type>& ids, const vector<const object*>& objs) { _removed_connection = _db.removed_objects.connect([this](const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts) {
on_objects_removed(ids, objs); on_objects_removed(ids, objs, impacted_accounts);
}); });
_applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); }); _applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); });
@ -292,6 +303,7 @@ void database_api_impl::set_subscribe_callback( std::function<void(const variant
//edump((clear_filter)); //edump((clear_filter));
_subscribe_callback = cb; _subscribe_callback = cb;
_notify_remove_create = notify_remove_create; _notify_remove_create = notify_remove_create;
_subscribed_accounts.clear();
static fc::bloom_parameters param; static fc::bloom_parameters param;
param.projected_element_count = 10000; param.projected_element_count = 10000;
@ -591,6 +603,8 @@ std::map<std::string, full_account> database_api_impl::get_full_accounts( const
if( subscribe ) if( subscribe )
{ {
FC_ASSERT( std::distance(_subscribed_accounts.begin(), _subscribed_accounts.end()) < 100 );
_subscribed_accounts.insert( account->get_id() );
subscribe_to_item( account->id ); subscribe_to_item( account->id );
} }
@ -1849,36 +1863,37 @@ void database_api_impl::broadcast_market_updates( const market_queue_type& queue
} }
} }
void database_api_impl::on_objects_removed( const vector<object_id_type>& ids, const vector<const object*>& objs ) void database_api_impl::on_objects_removed( const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts)
{ {
handle_object_changed(_notify_remove_create, ids, [objs](object_id_type id) -> const object* { handle_object_changed(_notify_remove_create, false, ids, impacted_accounts,
[objs](object_id_type id) -> const object* {
auto it = std::find_if(
objs.begin(), objs.end(),
[id](const object* o) {return o != nullptr && o->id == id;});
auto it = std::find_if( if (it != objs.end())
objs.begin(), objs.end(), return *it;
[id](const object* o) {return o != nullptr && o->id == id;});
if (it != objs.end()) return nullptr;
return *it; }
);
return nullptr;
});
} }
void database_api_impl::on_objects_new(const vector<object_id_type>& ids) void database_api_impl::on_objects_new(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts)
{ {
handle_object_changed(_notify_remove_create, ids, handle_object_changed(_notify_remove_create, true, ids, impacted_accounts,
std::bind(&object_database::find_object, &_db, std::placeholders::_1) std::bind(&object_database::find_object, &_db, std::placeholders::_1)
); );
} }
void database_api_impl::on_objects_changed(const vector<object_id_type>& ids) void database_api_impl::on_objects_changed(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts)
{ {
handle_object_changed(false, ids, handle_object_changed(false, true, ids, impacted_accounts,
std::bind(&object_database::find_object, &_db, std::placeholders::_1) std::bind(&object_database::find_object, &_db, std::placeholders::_1)
); );
} }
void database_api_impl::handle_object_changed(bool force_notify, const vector<object_id_type>& ids, std::function<const object*(object_id_type id)> find_object) void database_api_impl::handle_object_changed(bool force_notify, bool full_object, const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts, std::function<const object*(object_id_type id)> find_object)
{ {
if( _subscribe_callback ) if( _subscribe_callback )
{ {
@ -1886,13 +1901,19 @@ void database_api_impl::handle_object_changed(bool force_notify, const vector<ob
for(auto id : ids) for(auto id : ids)
{ {
const object* obj = nullptr; if( force_notify || is_subscribed_to_item(id) || is_impacted_account(impacted_accounts) )
if( force_notify || is_subscribed_to_item(id) )
{ {
obj = find_object(id); if( full_object )
if( obj )
{ {
updates.emplace_back( obj->to_variant() ); auto obj = find_object(id);
if( obj )
{
updates.emplace_back( obj->to_variant() );
}
}
else
{
updates.emplace_back( id );
} }
} }
} }
@ -1908,11 +1929,11 @@ void database_api_impl::handle_object_changed(bool force_notify, const vector<ob
{ {
if( id.is<call_order_object>() ) if( id.is<call_order_object>() )
{ {
enqueue_if_subscribed_to_market<call_order_object>( find_object(id), broadcast_queue ); enqueue_if_subscribed_to_market<call_order_object>( find_object(id), broadcast_queue, full_object );
} }
else if( id.is<limit_order_object>() ) else if( id.is<limit_order_object>() )
{ {
enqueue_if_subscribed_to_market<limit_order_object>( find_object(id), broadcast_queue ); enqueue_if_subscribed_to_market<limit_order_object>( find_object(id), broadcast_queue, full_object );
} }
} }