handle new database_object signals, refactor

This commit is contained in:
elmato 2017-02-10 08:36:07 +00:00
parent 29c636fcef
commit bfa600c559

View file

@ -41,6 +41,8 @@
#define GET_REQUIRED_FEES_MAX_RECURSION 4 #define GET_REQUIRED_FEES_MAX_RECURSION 4
typedef std::map< std::pair<graphene::chain::asset_id_type, graphene::chain::asset_id_type>, std::vector<fc::variant> > market_queue_type;
namespace graphene { namespace app { namespace graphene { namespace app {
class database_api_impl; class database_api_impl;
@ -164,9 +166,26 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
return _subscribe_filter.contains( i ); return _subscribe_filter.contains( i );
} }
void broadcast_updates( const vector<variant>& updates ); template<typename T>
void enqueue_if_subscribed_to_market(const object* obj, market_queue_type& queue, bool full_object=true)
{
const T* order = dynamic_cast<const T*>(obj);
FC_ASSERT( order != nullptr);
auto market = order->get_market();
auto sub = _market_subscriptions.find( market );
if( sub != _market_subscriptions.end() ) {
queue[market].emplace_back( full_object ? obj->to_variant() : fc::variant(obj->id) );
}
}
void broadcast_updates( const vector<variant>& updates );
void broadcast_market_updates( const market_queue_type& queue);
void check_for_market_objects(const vector<object_id_type>& ids);
/** called every time a block is applied to report the objects that were changed */ /** called every time a block is applied to report the objects that were changed */
void on_objects_new(const vector<object_id_type>& ids);
void on_objects_changed(const vector<object_id_type>& ids); void on_objects_changed(const vector<object_id_type>& ids);
void on_objects_removed(const vector<const object*>& objs); void on_objects_removed(const vector<const object*>& objs);
void on_applied_block(); void on_applied_block();
@ -176,6 +195,7 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
std::function<void(const fc::variant&)> _pending_trx_callback; std::function<void(const fc::variant&)> _pending_trx_callback;
std::function<void(const fc::variant&)> _block_applied_callback; std::function<void(const fc::variant&)> _block_applied_callback;
boost::signals2::scoped_connection _new_connection;
boost::signals2::scoped_connection _change_connection; boost::signals2::scoped_connection _change_connection;
boost::signals2::scoped_connection _removed_connection; boost::signals2::scoped_connection _removed_connection;
boost::signals2::scoped_connection _applied_block_connection; boost::signals2::scoped_connection _applied_block_connection;
@ -198,6 +218,9 @@ database_api::~database_api() {}
database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db) database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db)
{ {
wlog("creating database api ${x}", ("x",int64_t(this)) ); wlog("creating database api ${x}", ("x",int64_t(this)) );
_new_connection = _db.new_objects.connect([this](const vector<object_id_type>& ids) {
on_objects_new(ids);
});
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) { _change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) {
on_objects_changed(ids); on_objects_changed(ids);
}); });
@ -628,7 +651,7 @@ std::map<std::string, full_account> database_api_impl::get_full_accounts( const
[&acnt] (const call_order_object& call) { [&acnt] (const call_order_object& call) {
acnt.call_orders.emplace_back(call); acnt.call_orders.emplace_back(call);
}); });
// get assets issued by user // get assets issued by user
auto asset_range = _db.get_index_type<asset_index>().indices().get<by_issuer>().equal_range(account->id); auto asset_range = _db.get_index_type<asset_index>().indices().get<by_issuer>().equal_range(account->id);
std::for_each(asset_range.first, asset_range.second, std::for_each(asset_range.first, asset_range.second,
@ -1800,10 +1823,27 @@ vector<blinded_balance_object> database_api_impl::get_blinded_balances( const fl
void database_api_impl::broadcast_updates( const vector<variant>& updates ) void database_api_impl::broadcast_updates( const vector<variant>& updates )
{ {
if( updates.size() ) { if( updates.size() && _subscribe_callback ) {
auto capture_this = shared_from_this(); auto capture_this = shared_from_this();
fc::async([capture_this,updates](){ fc::async([capture_this,updates](){
capture_this->_subscribe_callback( fc::variant(updates) ); if(capture_this->_subscribe_callback)
capture_this->_subscribe_callback( fc::variant(updates) );
});
}
}
void database_api_impl::broadcast_market_updates( const market_queue_type& queue)
{
if( queue.size() )
{
auto capture_this = shared_from_this();
fc::async([capture_this, this, queue](){
for( const auto& item : queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
}); });
} }
} }
@ -1813,97 +1853,89 @@ void database_api_impl::on_objects_removed( const vector<const object*>& objs )
/// we need to ensure the database_api is not deleted for the life of the async operation /// we need to ensure the database_api is not deleted for the life of the async operation
if( _subscribe_callback ) if( _subscribe_callback )
{ {
vector<variant> updates; vector<variant> updates;
updates.reserve(objs.size());
for( auto obj : objs ) { for( auto obj : objs )
{
if ( is_subscribed_to_item(obj->id) ) if ( is_subscribed_to_item(obj->id) )
updates.emplace_back( obj->id ); {
updates.push_back( fc::variant(obj->id) );
}
} }
broadcast_updates( updates ); broadcast_updates( updates );
} }
if( _market_subscriptions.size() ) if( _market_subscriptions.size() )
{ {
map< pair<asset_id_type, asset_id_type>, vector<variant> > broadcast_queue; market_queue_type broadcast_queue;
for( const auto& obj : objs ) for( const auto& obj : objs )
{ {
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj); if( obj->id.is<call_order_object>() )
if( order )
{ {
auto sub = _market_subscriptions.find( order->get_market() ); enqueue_if_subscribed_to_market<call_order_object>( obj, broadcast_queue, false );
if( sub != _market_subscriptions.end() ) }
broadcast_queue[order->get_market()].emplace_back( order->id ); else if( obj->id.is<limit_order_object>() )
{
enqueue_if_subscribed_to_market<limit_order_object>( obj, broadcast_queue, false );
} }
} }
if( broadcast_queue.size() )
{ broadcast_market_updates(broadcast_queue);
auto capture_this = shared_from_this();
fc::async([capture_this,this,broadcast_queue](){
for( const auto& item : broadcast_queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
});
}
} }
} }
void database_api_impl::check_for_market_objects(const vector<object_id_type>& ids)
{
if( _market_subscriptions.size() )
{
market_queue_type broadcast_queue;
for(auto id : ids)
{
if( id.is<call_order_object>() )
{
enqueue_if_subscribed_to_market<call_order_object>( _db.find_object(id), broadcast_queue );
}
else if( id.is<limit_order_object>() )
{
enqueue_if_subscribed_to_market<limit_order_object>( _db.find_object(id), broadcast_queue );
}
}
broadcast_market_updates(broadcast_queue);
}
}
void database_api_impl::on_objects_new(const vector<object_id_type>& ids)
{
check_for_market_objects(ids);
}
void database_api_impl::on_objects_changed(const vector<object_id_type>& ids) void database_api_impl::on_objects_changed(const vector<object_id_type>& ids)
{ {
vector<variant> updates; if( _subscribe_callback )
map< pair<asset_id_type, asset_id_type>, vector<variant> > market_broadcast_queue;
for(auto id : ids)
{ {
const object* obj = nullptr; vector<variant> updates;
if( is_subscribed_to_item(id) )
for(auto id : ids)
{ {
obj = _db.find_object( id ); const object* obj = nullptr;
if( obj ) if( is_subscribed_to_item(id) )
{ {
updates.emplace_back( obj->to_variant() );
}
else
{
updates.emplace_back(id); // send just the id to indicate removal
}
}
if( _market_subscriptions.size() )
{
if( !is_subscribed_to_item(id) )
obj = _db.find_object( id ); obj = _db.find_object( id );
if( obj ) if( obj )
{
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
if( order )
{ {
auto sub = _market_subscriptions.find( order->get_market() ); updates.emplace_back( obj->to_variant() );
if( sub != _market_subscriptions.end() )
market_broadcast_queue[order->get_market()].emplace_back( order->id );
} }
} }
} }
broadcast_updates(updates);
} }
auto capture_this = shared_from_this(); check_for_market_objects(ids);
/// pushing the future back / popping the prior future if it is complete.
/// if a connection hangs then this could get backed up and result in
/// a failure to exit cleanly.
fc::async([capture_this,this,updates,market_broadcast_queue](){
if( _subscribe_callback && updates.size() ) _subscribe_callback( updates );
for( const auto& item : market_broadcast_queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
});
} }
/** note: this method cannot yield because it is called in the middle of /** note: this method cannot yield because it is called in the middle of