Merge commit '0ecdc90d4df2ebd80a6d4cfa4388f4b8a608c9c1' into betting
This commit is contained in:
commit
b200fb893b
4 changed files with 144 additions and 91 deletions
|
|
@ -41,6 +41,8 @@
|
||||||
|
|
||||||
#define GET_REQUIRED_FEES_MAX_RECURSION 4
|
#define GET_REQUIRED_FEES_MAX_RECURSION 4
|
||||||
|
|
||||||
|
typedef std::map< std::pair<graphene::chain::asset_id_type, graphene::chain::asset_id_type>, std::vector<fc::variant> > market_queue_type;
|
||||||
|
|
||||||
namespace graphene { namespace app {
|
namespace graphene { namespace app {
|
||||||
|
|
||||||
class database_api_impl;
|
class database_api_impl;
|
||||||
|
|
@ -56,7 +58,7 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
|
||||||
fc::variants get_objects(const vector<object_id_type>& ids)const;
|
fc::variants get_objects(const vector<object_id_type>& ids)const;
|
||||||
|
|
||||||
// Subscriptions
|
// Subscriptions
|
||||||
void set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter );
|
void set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create );
|
||||||
void set_pending_transaction_callback( std::function<void(const variant&)> cb );
|
void set_pending_transaction_callback( std::function<void(const variant&)> cb );
|
||||||
void set_block_applied_callback( std::function<void(const variant& block_id)> cb );
|
void set_block_applied_callback( std::function<void(const variant& block_id)> cb );
|
||||||
void cancel_all_subscriptions();
|
void cancel_all_subscriptions();
|
||||||
|
|
@ -167,22 +169,41 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
|
||||||
{
|
{
|
||||||
if( !_subscribe_callback )
|
if( !_subscribe_callback )
|
||||||
return false;
|
return false;
|
||||||
return true;
|
|
||||||
return _subscribe_filter.contains( i );
|
return _subscribe_filter.contains( i );
|
||||||
}
|
}
|
||||||
|
|
||||||
void broadcast_updates( const vector<variant>& updates );
|
template<typename T>
|
||||||
|
void enqueue_if_subscribed_to_market(const object* obj, market_queue_type& queue, bool full_object=true)
|
||||||
|
{
|
||||||
|
const T* order = dynamic_cast<const T*>(obj);
|
||||||
|
FC_ASSERT( order != nullptr);
|
||||||
|
|
||||||
|
auto market = order->get_market();
|
||||||
|
|
||||||
|
auto sub = _market_subscriptions.find( market );
|
||||||
|
if( sub != _market_subscriptions.end() ) {
|
||||||
|
queue[market].emplace_back( full_object ? obj->to_variant() : fc::variant(obj->id) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void broadcast_updates( const vector<variant>& updates );
|
||||||
|
void broadcast_market_updates( const market_queue_type& queue);
|
||||||
|
void check_for_market_objects(const vector<object_id_type>& ids);
|
||||||
|
|
||||||
/** called every time a block is applied to report the objects that were changed */
|
/** called every time a block is applied to report the objects that were changed */
|
||||||
|
void on_objects_new(const vector<object_id_type>& ids);
|
||||||
void on_objects_changed(const vector<object_id_type>& ids);
|
void on_objects_changed(const vector<object_id_type>& ids);
|
||||||
void on_objects_removed(const vector<const object*>& objs);
|
void on_objects_removed(const vector<const object*>& objs);
|
||||||
void on_applied_block();
|
void on_applied_block();
|
||||||
|
|
||||||
mutable fc::bloom_filter _subscribe_filter;
|
bool _notify_remove_create = false;
|
||||||
|
mutable fc::bloom_filter _subscribe_filter;
|
||||||
std::function<void(const fc::variant&)> _subscribe_callback;
|
std::function<void(const fc::variant&)> _subscribe_callback;
|
||||||
std::function<void(const fc::variant&)> _pending_trx_callback;
|
std::function<void(const fc::variant&)> _pending_trx_callback;
|
||||||
std::function<void(const fc::variant&)> _block_applied_callback;
|
std::function<void(const fc::variant&)> _block_applied_callback;
|
||||||
|
|
||||||
|
boost::signals2::scoped_connection _new_connection;
|
||||||
boost::signals2::scoped_connection _change_connection;
|
boost::signals2::scoped_connection _change_connection;
|
||||||
boost::signals2::scoped_connection _removed_connection;
|
boost::signals2::scoped_connection _removed_connection;
|
||||||
boost::signals2::scoped_connection _applied_block_connection;
|
boost::signals2::scoped_connection _applied_block_connection;
|
||||||
|
|
@ -205,6 +226,9 @@ database_api::~database_api() {}
|
||||||
database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db)
|
database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db)
|
||||||
{
|
{
|
||||||
wlog("creating database api ${x}", ("x",int64_t(this)) );
|
wlog("creating database api ${x}", ("x",int64_t(this)) );
|
||||||
|
_new_connection = _db.new_objects.connect([this](const vector<object_id_type>& ids) {
|
||||||
|
on_objects_new(ids);
|
||||||
|
});
|
||||||
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) {
|
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) {
|
||||||
on_objects_changed(ids);
|
on_objects_changed(ids);
|
||||||
});
|
});
|
||||||
|
|
@ -265,24 +289,23 @@ fc::variants database_api_impl::get_objects(const vector<object_id_type>& ids)co
|
||||||
// //
|
// //
|
||||||
//////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
void database_api::set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter )
|
void database_api::set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create )
|
||||||
{
|
{
|
||||||
my->set_subscribe_callback( cb, clear_filter );
|
my->set_subscribe_callback( cb, notify_remove_create );
|
||||||
}
|
}
|
||||||
|
|
||||||
void database_api_impl::set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter )
|
void database_api_impl::set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create )
|
||||||
{
|
{
|
||||||
edump((clear_filter));
|
//edump((clear_filter));
|
||||||
_subscribe_callback = cb;
|
_subscribe_callback = cb;
|
||||||
if( clear_filter || !cb )
|
_notify_remove_create = notify_remove_create;
|
||||||
{
|
|
||||||
static fc::bloom_parameters param;
|
static fc::bloom_parameters param;
|
||||||
param.projected_element_count = 10000;
|
param.projected_element_count = 10000;
|
||||||
param.false_positive_probability = 1.0/10000;
|
param.false_positive_probability = 1.0/10000;
|
||||||
param.maximum_size = 1024*8*8*2;
|
param.maximum_size = 1024*8*8*2;
|
||||||
param.compute_optimal_parameters();
|
param.compute_optimal_parameters();
|
||||||
_subscribe_filter = fc::bloom_filter(param);
|
_subscribe_filter = fc::bloom_filter(param);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void database_api::set_pending_transaction_callback( std::function<void(const variant&)> cb )
|
void database_api::set_pending_transaction_callback( std::function<void(const variant&)> cb )
|
||||||
|
|
@ -586,7 +609,6 @@ std::map<std::string, full_account> database_api_impl::get_full_accounts( const
|
||||||
|
|
||||||
if( subscribe )
|
if( subscribe )
|
||||||
{
|
{
|
||||||
ilog( "subscribe to ${id}", ("id",account->name) );
|
|
||||||
subscribe_to_item( account->id );
|
subscribe_to_item( account->id );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -648,7 +670,7 @@ std::map<std::string, full_account> database_api_impl::get_full_accounts( const
|
||||||
[&acnt] (const call_order_object& call) {
|
[&acnt] (const call_order_object& call) {
|
||||||
acnt.call_orders.emplace_back(call);
|
acnt.call_orders.emplace_back(call);
|
||||||
});
|
});
|
||||||
|
|
||||||
// get assets issued by user
|
// get assets issued by user
|
||||||
auto asset_range = _db.get_index_type<asset_index>().indices().get<by_issuer>().equal_range(account->id);
|
auto asset_range = _db.get_index_type<asset_index>().indices().get<by_issuer>().equal_range(account->id);
|
||||||
std::for_each(asset_range.first, asset_range.second,
|
std::for_each(asset_range.first, asset_range.second,
|
||||||
|
|
@ -1867,10 +1889,27 @@ vector<blinded_balance_object> database_api_impl::get_blinded_balances( const fl
|
||||||
|
|
||||||
void database_api_impl::broadcast_updates( const vector<variant>& updates )
|
void database_api_impl::broadcast_updates( const vector<variant>& updates )
|
||||||
{
|
{
|
||||||
if( updates.size() ) {
|
if( updates.size() && _subscribe_callback ) {
|
||||||
auto capture_this = shared_from_this();
|
auto capture_this = shared_from_this();
|
||||||
fc::async([capture_this,updates](){
|
fc::async([capture_this,updates](){
|
||||||
capture_this->_subscribe_callback( fc::variant(updates) );
|
if(capture_this->_subscribe_callback)
|
||||||
|
capture_this->_subscribe_callback( fc::variant(updates) );
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void database_api_impl::broadcast_market_updates( const market_queue_type& queue)
|
||||||
|
{
|
||||||
|
if( queue.size() )
|
||||||
|
{
|
||||||
|
auto capture_this = shared_from_this();
|
||||||
|
fc::async([capture_this, this, queue](){
|
||||||
|
for( const auto& item : queue )
|
||||||
|
{
|
||||||
|
auto sub = _market_subscriptions.find(item.first);
|
||||||
|
if( sub != _market_subscriptions.end() )
|
||||||
|
sub->second( fc::variant(item.second ) );
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1880,95 +1919,89 @@ void database_api_impl::on_objects_removed( const vector<const object*>& objs )
|
||||||
/// we need to ensure the database_api is not deleted for the life of the async operation
|
/// we need to ensure the database_api is not deleted for the life of the async operation
|
||||||
if( _subscribe_callback )
|
if( _subscribe_callback )
|
||||||
{
|
{
|
||||||
vector<variant> updates;
|
vector<variant> updates;
|
||||||
updates.reserve(objs.size());
|
|
||||||
|
|
||||||
for( auto obj : objs )
|
for( auto obj : objs )
|
||||||
updates.emplace_back( obj->id );
|
{
|
||||||
|
if ( is_subscribed_to_item(obj->id) )
|
||||||
|
{
|
||||||
|
updates.push_back( fc::variant(obj->id) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
broadcast_updates( updates );
|
broadcast_updates( updates );
|
||||||
}
|
}
|
||||||
|
|
||||||
if( _market_subscriptions.size() )
|
if( _market_subscriptions.size() )
|
||||||
{
|
{
|
||||||
map< pair<asset_id_type, asset_id_type>, vector<variant> > broadcast_queue;
|
market_queue_type broadcast_queue;
|
||||||
|
|
||||||
for( const auto& obj : objs )
|
for( const auto& obj : objs )
|
||||||
{
|
{
|
||||||
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
|
if( obj->id.is<call_order_object>() )
|
||||||
if( order )
|
|
||||||
{
|
{
|
||||||
auto sub = _market_subscriptions.find( order->get_market() );
|
enqueue_if_subscribed_to_market<call_order_object>( obj, broadcast_queue, false );
|
||||||
if( sub != _market_subscriptions.end() )
|
}
|
||||||
broadcast_queue[order->get_market()].emplace_back( order->id );
|
else if( obj->id.is<limit_order_object>() )
|
||||||
|
{
|
||||||
|
enqueue_if_subscribed_to_market<limit_order_object>( obj, broadcast_queue, false );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if( broadcast_queue.size() )
|
|
||||||
{
|
broadcast_market_updates(broadcast_queue);
|
||||||
auto capture_this = shared_from_this();
|
|
||||||
fc::async([capture_this,this,broadcast_queue](){
|
|
||||||
for( const auto& item : broadcast_queue )
|
|
||||||
{
|
|
||||||
auto sub = _market_subscriptions.find(item.first);
|
|
||||||
if( sub != _market_subscriptions.end() )
|
|
||||||
sub->second( fc::variant(item.second ) );
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void database_api_impl::check_for_market_objects(const vector<object_id_type>& ids)
|
||||||
|
{
|
||||||
|
if( _market_subscriptions.size() )
|
||||||
|
{
|
||||||
|
market_queue_type broadcast_queue;
|
||||||
|
|
||||||
|
for(auto id : ids)
|
||||||
|
{
|
||||||
|
if( id.is<call_order_object>() )
|
||||||
|
{
|
||||||
|
enqueue_if_subscribed_to_market<call_order_object>( _db.find_object(id), broadcast_queue );
|
||||||
|
}
|
||||||
|
else if( id.is<limit_order_object>() )
|
||||||
|
{
|
||||||
|
enqueue_if_subscribed_to_market<limit_order_object>( _db.find_object(id), broadcast_queue );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
broadcast_market_updates(broadcast_queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void database_api_impl::on_objects_new(const vector<object_id_type>& ids)
|
||||||
|
{
|
||||||
|
check_for_market_objects(ids);
|
||||||
|
}
|
||||||
|
|
||||||
void database_api_impl::on_objects_changed(const vector<object_id_type>& ids)
|
void database_api_impl::on_objects_changed(const vector<object_id_type>& ids)
|
||||||
{
|
{
|
||||||
vector<variant> updates;
|
if( _subscribe_callback )
|
||||||
map< pair<asset_id_type, asset_id_type>, vector<variant> > market_broadcast_queue;
|
|
||||||
|
|
||||||
for(auto id : ids)
|
|
||||||
{
|
{
|
||||||
const object* obj = nullptr;
|
vector<variant> updates;
|
||||||
if( _subscribe_callback )
|
|
||||||
|
for(auto id : ids)
|
||||||
{
|
{
|
||||||
obj = _db.find_object( id );
|
const object* obj = nullptr;
|
||||||
if( obj )
|
if( is_subscribed_to_item(id) )
|
||||||
{
|
{
|
||||||
updates.emplace_back( obj->to_variant() );
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
updates.emplace_back(id); // send just the id to indicate removal
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if( _market_subscriptions.size() )
|
|
||||||
{
|
|
||||||
if( !_subscribe_callback )
|
|
||||||
obj = _db.find_object( id );
|
obj = _db.find_object( id );
|
||||||
if( obj )
|
if( obj )
|
||||||
{
|
|
||||||
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
|
|
||||||
if( order )
|
|
||||||
{
|
{
|
||||||
auto sub = _market_subscriptions.find( order->get_market() );
|
updates.emplace_back( obj->to_variant() );
|
||||||
if( sub != _market_subscriptions.end() )
|
|
||||||
market_broadcast_queue[order->get_market()].emplace_back( order->id );
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
broadcast_updates(updates);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto capture_this = shared_from_this();
|
check_for_market_objects(ids);
|
||||||
|
|
||||||
/// pushing the future back / popping the prior future if it is complete.
|
|
||||||
/// if a connection hangs then this could get backed up and result in
|
|
||||||
/// a failure to exit cleanly.
|
|
||||||
fc::async([capture_this,this,updates,market_broadcast_queue](){
|
|
||||||
if( _subscribe_callback ) _subscribe_callback( updates );
|
|
||||||
|
|
||||||
for( const auto& item : market_broadcast_queue )
|
|
||||||
{
|
|
||||||
auto sub = _market_subscriptions.find(item.first);
|
|
||||||
if( sub != _market_subscriptions.end() )
|
|
||||||
sub->second( fc::variant(item.second ) );
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** note: this method cannot yield because it is called in the middle of
|
/** note: this method cannot yield because it is called in the middle of
|
||||||
|
|
|
||||||
|
|
@ -241,7 +241,7 @@ processed_transaction database::_push_transaction( const signed_transaction& trx
|
||||||
auto processed_trx = _apply_transaction( trx );
|
auto processed_trx = _apply_transaction( trx );
|
||||||
_pending_tx.push_back(processed_trx);
|
_pending_tx.push_back(processed_trx);
|
||||||
|
|
||||||
notify_changed_objects();
|
// notify_changed_objects();
|
||||||
// The transaction applied successfully. Merge its changes into the pending block session.
|
// The transaction applied successfully. Merge its changes into the pending block session.
|
||||||
temp_session.merge();
|
temp_session.merge();
|
||||||
|
|
||||||
|
|
@ -549,19 +549,26 @@ void database::notify_changed_objects()
|
||||||
if( _undo_db.enabled() )
|
if( _undo_db.enabled() )
|
||||||
{
|
{
|
||||||
const auto& head_undo = _undo_db.head();
|
const auto& head_undo = _undo_db.head();
|
||||||
|
|
||||||
|
vector<object_id_type> new_ids; new_ids.reserve(head_undo.new_ids.size());
|
||||||
|
for( const auto& item : head_undo.new_ids ) new_ids.push_back(item);
|
||||||
|
|
||||||
vector<object_id_type> changed_ids; changed_ids.reserve(head_undo.old_values.size());
|
vector<object_id_type> changed_ids; changed_ids.reserve(head_undo.old_values.size());
|
||||||
for( const auto& item : head_undo.old_values ) changed_ids.push_back(item.first);
|
for( const auto& item : head_undo.old_values ) changed_ids.push_back(item.first);
|
||||||
for( const auto& item : head_undo.new_ids ) changed_ids.push_back(item);
|
|
||||||
vector<const object*> removed;
|
vector<object_id_type> removed_ids; removed_ids.reserve( head_undo.removed.size() );
|
||||||
removed.reserve( head_undo.removed.size() );
|
vector<const object*> removed; removed.reserve( head_undo.removed.size() );
|
||||||
for( const auto& item : head_undo.removed )
|
for( const auto& item : head_undo.removed )
|
||||||
{
|
{
|
||||||
changed_ids.push_back( item.first );
|
removed_ids.emplace_back( item.first );
|
||||||
removed.emplace_back( item.second.get() );
|
removed.emplace_back( item.second.get() );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
new_objects(new_ids);
|
||||||
changed_objects(changed_ids);
|
changed_objects(changed_ids);
|
||||||
|
removed_objects(removed_ids, removed);
|
||||||
}
|
}
|
||||||
} FC_CAPTURE_AND_RETHROW() }
|
} FC_CAPTURE_AND_LOG( () ) }
|
||||||
|
|
||||||
processed_transaction database::apply_transaction(const signed_transaction& trx, uint32_t skip)
|
processed_transaction database::apply_transaction(const signed_transaction& trx, uint32_t skip)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -189,6 +189,12 @@ namespace graphene { namespace chain {
|
||||||
*/
|
*/
|
||||||
fc::signal<void(const signed_transaction&)> on_pending_transaction;
|
fc::signal<void(const signed_transaction&)> on_pending_transaction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emitted After a block has been applied and committed. The callback
|
||||||
|
* should not yield and should execute quickly.
|
||||||
|
*/
|
||||||
|
fc::signal<void(const vector<object_id_type>&)> new_objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Emitted After a block has been applied and committed. The callback
|
* Emitted After a block has been applied and committed. The callback
|
||||||
* should not yield and should execute quickly.
|
* should not yield and should execute quickly.
|
||||||
|
|
@ -198,7 +204,7 @@ namespace graphene { namespace chain {
|
||||||
/** this signal is emitted any time an object is removed and contains a
|
/** this signal is emitted any time an object is removed and contains a
|
||||||
* pointer to the last value of every object that was removed.
|
* pointer to the last value of every object that was removed.
|
||||||
*/
|
*/
|
||||||
fc::signal<void(const vector<const object*>&)> removed_objects;
|
fc::signal<void(const vector<object_id_type>&, const vector<const object*>&)> removed_objects;
|
||||||
|
|
||||||
//////////////////// db_witness_schedule.cpp ////////////////////
|
//////////////////// db_witness_schedule.cpp ////////////////////
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -120,6 +120,13 @@ class call_order_object : public abstract_object<call_order_object>
|
||||||
share_type collateral; ///< call_price.base.asset_id, access via get_collateral
|
share_type collateral; ///< call_price.base.asset_id, access via get_collateral
|
||||||
share_type debt; ///< call_price.quote.asset_id, access via get_collateral
|
share_type debt; ///< call_price.quote.asset_id, access via get_collateral
|
||||||
price call_price; ///< Debt / Collateral
|
price call_price; ///< Debt / Collateral
|
||||||
|
|
||||||
|
pair<asset_id_type,asset_id_type> get_market()const
|
||||||
|
{
|
||||||
|
auto tmp = std::make_pair( call_price.base.asset_id, call_price.quote.asset_id );
|
||||||
|
if( tmp.first > tmp.second ) std::swap( tmp.first, tmp.second );
|
||||||
|
return tmp;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue