Handle new,changed and removed events from the database_object in one generic function

This commit is contained in:
elmato 2017-02-14 21:39:50 +00:00
parent 0ecdc90d4d
commit 31f322c9cd

View file

@ -182,12 +182,12 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
void broadcast_updates( const vector<variant>& updates ); void broadcast_updates( const vector<variant>& updates );
void broadcast_market_updates( const market_queue_type& queue); void broadcast_market_updates( const market_queue_type& queue);
void check_for_market_objects(const vector<object_id_type>& ids); void handle_object_changed(bool force_notify, const vector<object_id_type>& ids, std::function<const object*(object_id_type id)> find_object);
/** called every time a block is applied to report the objects that were changed */ /** called every time a block is applied to report the objects that were changed */
void on_objects_new(const vector<object_id_type>& ids); void on_objects_new(const vector<object_id_type>& ids);
void on_objects_changed(const vector<object_id_type>& ids); void on_objects_changed(const vector<object_id_type>& ids);
void on_objects_removed(const vector<const object*>& objs); void on_objects_removed(const vector<object_id_type>& ids, const vector<const object*>& objs);
void on_applied_block(); void on_applied_block();
bool _notify_remove_create = false; bool _notify_remove_create = false;
@ -225,8 +225,8 @@ database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db)
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) { _change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) {
on_objects_changed(ids); on_objects_changed(ids);
}); });
_removed_connection = _db.removed_objects.connect([this](const vector<const object*>& objs) { _removed_connection = _db.removed_objects.connect([this](const vector<object_id_type>& ids, const vector<const object*>& objs) {
on_objects_removed(objs); on_objects_removed(ids, objs);
}); });
_applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); }); _applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); });
@ -1849,72 +1849,36 @@ void database_api_impl::broadcast_market_updates( const market_queue_type& queue
} }
} }
void database_api_impl::on_objects_removed( const vector<const object*>& objs ) void database_api_impl::on_objects_removed( const vector<object_id_type>& ids, const vector<const object*>& objs )
{ {
/// we need to ensure the database_api is not deleted for the life of the async operation handle_object_changed(_notify_remove_create, ids, [objs](object_id_type id) -> const object* {
if( _subscribe_callback )
{
vector<variant> updates;
for( auto obj : objs ) auto it = std::find_if(
{ objs.begin(), objs.end(),
if ( is_subscribed_to_item(obj->id) ) [id](const object* o) {return o != nullptr && o->id == id;});
{
updates.push_back( fc::variant(obj->id) );
}
}
broadcast_updates( updates ); if (it != objs.end())
} return *it;
if( _market_subscriptions.size() ) return nullptr;
{ });
market_queue_type broadcast_queue;
for( const auto& obj : objs )
{
if( obj->id.is<call_order_object>() )
{
enqueue_if_subscribed_to_market<call_order_object>( obj, broadcast_queue, false );
}
else if( obj->id.is<limit_order_object>() )
{
enqueue_if_subscribed_to_market<limit_order_object>( obj, broadcast_queue, false );
}
}
broadcast_market_updates(broadcast_queue);
}
}
void database_api_impl::check_for_market_objects(const vector<object_id_type>& ids)
{
if( _market_subscriptions.size() )
{
market_queue_type broadcast_queue;
for(auto id : ids)
{
if( id.is<call_order_object>() )
{
enqueue_if_subscribed_to_market<call_order_object>( _db.find_object(id), broadcast_queue );
}
else if( id.is<limit_order_object>() )
{
enqueue_if_subscribed_to_market<limit_order_object>( _db.find_object(id), broadcast_queue );
}
}
broadcast_market_updates(broadcast_queue);
}
} }
void database_api_impl::on_objects_new(const vector<object_id_type>& ids) void database_api_impl::on_objects_new(const vector<object_id_type>& ids)
{ {
check_for_market_objects(ids); handle_object_changed(_notify_remove_create, ids,
std::bind(&object_database::find_object, &_db, std::placeholders::_1)
);
} }
void database_api_impl::on_objects_changed(const vector<object_id_type>& ids) void database_api_impl::on_objects_changed(const vector<object_id_type>& ids)
{
handle_object_changed(false, ids,
std::bind(&object_database::find_object, &_db, std::placeholders::_1)
);
}
void database_api_impl::handle_object_changed(bool force_notify, const vector<object_id_type>& ids, std::function<const object*(object_id_type id)> find_object)
{ {
if( _subscribe_callback ) if( _subscribe_callback )
{ {
@ -1923,9 +1887,9 @@ void database_api_impl::on_objects_changed(const vector<object_id_type>& ids)
for(auto id : ids) for(auto id : ids)
{ {
const object* obj = nullptr; const object* obj = nullptr;
if( is_subscribed_to_item(id) ) if( force_notify || is_subscribed_to_item(id) )
{ {
obj = _db.find_object( id ); obj = find_object(id);
if( obj ) if( obj )
{ {
updates.emplace_back( obj->to_variant() ); updates.emplace_back( obj->to_variant() );
@ -1936,7 +1900,24 @@ void database_api_impl::on_objects_changed(const vector<object_id_type>& ids)
broadcast_updates(updates); broadcast_updates(updates);
} }
check_for_market_objects(ids); if( _market_subscriptions.size() )
{
market_queue_type broadcast_queue;
for(auto id : ids)
{
if( id.is<call_order_object>() )
{
enqueue_if_subscribed_to_market<call_order_object>( find_object(id), broadcast_queue );
}
else if( id.is<limit_order_object>() )
{
enqueue_if_subscribed_to_market<limit_order_object>( find_object(id), broadcast_queue );
}
}
broadcast_market_updates(broadcast_queue);
}
} }
/** note: this method cannot yield because it is called in the middle of /** note: this method cannot yield because it is called in the middle of