Refactor full account subscriptions

When subscribing to an account via the get_full_account API it will
start streaming any object relevant to the account that is added,
removed, or modified.
This commit is contained in:
Daniel Larimer 2015-07-21 15:19:52 -04:00
parent b3d299d241
commit dffc83cca9
10 changed files with 251 additions and 33 deletions

View file

@ -21,6 +21,9 @@
#include <graphene/chain/database.hpp>
#include <graphene/utilities/key_conversion.hpp>
#include <graphene/chain/protocol/fee_schedule.hpp>
#include <graphene/chain/withdraw_permission_object.hpp>
#include <graphene/chain/worker_evaluator.hpp>
#include <graphene/chain/transaction_object.hpp>
#include <fc/crypto/hex.hpp>
#include <fc/smart_ref_impl.hpp>
@ -32,6 +35,9 @@ namespace graphene { namespace app {
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) {
on_objects_changed(ids);
});
_removed_connection = _db.removed_objects.connect([this](const vector<const object*>& objs) {
on_objects_removed(objs);
});
_applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); });
}
@ -158,8 +164,8 @@ namespace graphene { namespace app {
std::map<std::string, full_account> database_api::get_full_accounts(std::function<void(const variant&)> callback,
const vector<std::string>& names_or_ids)
{
FC_ASSERT( _account_subscriptions.size() < 1024 );
std::map<std::string, full_account> results;
std::set<object_id_type> ids_to_subscribe;
for (const std::string& account_name_or_id : names_or_ids)
{
@ -176,7 +182,7 @@ namespace graphene { namespace app {
if (account == nullptr)
continue;
ids_to_subscribe.insert({account->id, account->statistics});
_account_subscriptions[account->id] = callback;
// fc::mutable_variant_object full_account;
full_account acnt;
@ -194,7 +200,6 @@ namespace graphene { namespace app {
*/
if (account->cashback_vb)
{
ids_to_subscribe.insert(*account->cashback_vb);
acnt.cashback_balance = account->cashback_balance(_db);
}
@ -202,36 +207,31 @@ namespace graphene { namespace app {
auto balance_range = _db.get_index_type<account_balance_index>().indices().get<by_account>().equal_range(account->id);
//vector<account_balance_object> balances;
std::for_each(balance_range.first, balance_range.second,
[&acnt, &ids_to_subscribe](const account_balance_object& balance) {
[&acnt](const account_balance_object& balance) {
acnt.balances.emplace_back(balance);
ids_to_subscribe.insert(balance.id);
});
// Add the account's vesting balances
auto vesting_range = _db.get_index_type<vesting_balance_index>().indices().get<by_account>().equal_range(account->id);
std::for_each(vesting_range.first, vesting_range.second,
[&acnt, &ids_to_subscribe](const vesting_balance_object& balance) {
[&acnt](const vesting_balance_object& balance) {
acnt.vesting_balances.emplace_back(balance);
ids_to_subscribe.insert(balance.id);
});
// Add the account's orders
auto order_range = _db.get_index_type<limit_order_index>().indices().get<by_account>().equal_range(account->id);
std::for_each(order_range.first, order_range.second,
[&acnt, &ids_to_subscribe] (const limit_order_object& order) {
[&acnt] (const limit_order_object& order) {
acnt.limit_orders.emplace_back(order);
ids_to_subscribe.insert(order.id);
});
auto call_range = _db.get_index_type<call_order_index>().indices().get<by_account>().equal_range(account->id);
std::for_each(call_range.first, call_range.second,
[&acnt, &ids_to_subscribe] (const call_order_object& call) {
[&acnt] (const call_order_object& call) {
acnt.call_orders.emplace_back(call);
ids_to_subscribe.insert(call.id);
});
results[account_name_or_id] = acnt;
}
wdump((results));
subscribe_to_objects(callback, vector<object_id_type>(ids_to_subscribe.begin(), ids_to_subscribe.end()));
return results;
}
@ -554,24 +554,205 @@ namespace graphene { namespace app {
return *_history_api;
}
vector<account_id_type> get_relevant_accounts( const object* obj )
{
vector<account_id_type> result;
if( obj->id.space() == protocol_ids )
{
switch( (object_type)obj->id.type() )
{
case null_object_type:
case base_object_type:
case OBJECT_TYPE_COUNT:
return result;
case account_object_type:{
result.push_back( obj->id );
break;
} case asset_object_type:{
const auto& aobj = dynamic_cast<const asset_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->issuer );
break;
} case force_settlement_object_type:{
const auto& aobj = dynamic_cast<const force_settlement_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->owner );
break;
} case committee_member_object_type:{
const auto& aobj = dynamic_cast<const committee_member_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->committee_member_account );
break;
} case witness_object_type:{
const auto& aobj = dynamic_cast<const witness_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->witness_account );
break;
} case limit_order_object_type:{
const auto& aobj = dynamic_cast<const limit_order_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->seller );
break;
} case call_order_object_type:{
const auto& aobj = dynamic_cast<const call_order_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->borrower );
break;
} case custom_object_type:{
} case proposal_object_type:{
const auto& aobj = dynamic_cast<const proposal_object*>(obj);
assert( aobj != nullptr );
flat_set<account_id_type> impacted;
aobj->proposed_transaction.get_impacted_accounts( impacted );
result.reserve( impacted.size() );
for( auto& item : impacted ) result.emplace_back(item);
break;
} case operation_history_object_type:{
const auto& aobj = dynamic_cast<const operation_history_object*>(obj);
assert( aobj != nullptr );
flat_set<account_id_type> impacted;
operation_get_impacted_accounts( aobj->op, impacted );
result.reserve( impacted.size() );
for( auto& item : impacted ) result.emplace_back(item);
break;
} case withdraw_permission_object_type:{
const auto& aobj = dynamic_cast<const withdraw_permission_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->withdraw_from_account );
result.push_back( aobj->authorized_account );
break;
} case vesting_balance_object_type:{
const auto& aobj = dynamic_cast<const vesting_balance_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->owner );
break;
} case worker_object_type:{
const auto& aobj = dynamic_cast<const worker_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->worker_account );
break;
} case balance_object_type:{
/** these are free from any accounts */
}
}
}
else if( obj->id.space() == implementation_ids )
{
switch( (impl_object_type)obj->id.type() )
{
case impl_global_property_object_type:{
} case impl_dynamic_global_property_object_type:{
} case impl_index_meta_object_type:{
} case impl_asset_dynamic_data_type:{
} case impl_asset_bitasset_data_type:{
break;
} case impl_account_balance_object_type:{
const auto& aobj = dynamic_cast<const account_balance_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->owner );
break;
} case impl_account_statistics_object_type:{
const auto& aobj = dynamic_cast<const account_statistics_object*>(obj);
assert( aobj != nullptr );
result.push_back( aobj->owner );
break;
} case impl_transaction_object_type:{
const auto& aobj = dynamic_cast<const transaction_object*>(obj);
assert( aobj != nullptr );
flat_set<account_id_type> impacted;
aobj->trx.get_impacted_accounts( impacted );
result.reserve( impacted.size() );
for( auto& item : impacted ) result.emplace_back(item);
break;
} case impl_block_summary_object_type:{
} case impl_account_transaction_history_object_type:{
} case impl_witness_schedule_object_type: {
}
}
}
return result;
} // end get_relevant_accounts( obj )
void database_api::on_objects_removed( const vector<const object*>& objs )
{
if( _account_subscriptions.size() )
{
map<account_id_type, vector<variant> > broadcast_queue;
for( const auto& obj : objs )
{
auto relevant = get_relevant_accounts( obj );
for( const auto& r : relevant )
{
auto sub = _account_subscriptions.find(r);
if( sub != _account_subscriptions.end() )
broadcast_queue[r].emplace_back(obj->to_variant());
}
}
_broadcast_removed_complete = fc::async([=](){
for( const auto& item : broadcast_queue )
{
auto sub = _account_subscriptions.find(item.first);
if( sub != _account_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
});
}
}
void database_api::on_objects_changed(const vector<object_id_type>& ids)
{
vector<object_id_type> my_objects;
vector<object_id_type> my_objects;
map<account_id_type, vector<variant> > broadcast_queue;
for(auto id : ids)
{
if(_subscriptions.find(id) != _subscriptions.end())
my_objects.push_back(id);
if( _account_subscriptions.size() )
{
const object* obj = _db.find_object( id );
if( obj )
{
vector<account_id_type> relevant = get_relevant_accounts( obj );
for( const auto& r : relevant )
{
auto sub = _account_subscriptions.find(r);
if( sub != _account_subscriptions.end() )
broadcast_queue[r].emplace_back(obj->to_variant());
}
}
}
}
/// TODO: consider making _broadcast_changes_complete a deque and
/// pushing the future back / popping the prior future if it is complete.
/// if a connection hangs then this could get backed up and result in
/// a failure to exit cleanly.
_broadcast_changes_complete = fc::async([=](){
for( const auto& item : broadcast_queue )
{
auto sub = _account_subscriptions.find(item.first);
if( sub != _account_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
for(auto id : my_objects)
{
const object* obj = _db.find_object(id);
if(obj)
// just incase _usbscriptions changed between filter and broadcast
auto itr = _subscriptions.find( id );
if( itr != _subscriptions.end() )
{
_subscriptions[id](obj->to_variant());
}
else
{
_subscriptions[id](fc::variant(id));
const object* obj = _db.find_object( id );
if( obj != nullptr )
{
itr->second(obj->to_variant());
}
else
{
itr->second(fc::variant(id));
}
}
}
});
@ -624,6 +805,11 @@ namespace graphene { namespace app {
_broadcast_changes_complete.cancel();
_broadcast_changes_complete.wait();
}
if(_broadcast_removed_complete.valid())
{
_broadcast_removed_complete.cancel();
_broadcast_removed_complete.wait();
}
} catch (const fc::exception& e)
{
wlog("${e}", ("e",e.to_detail_string()));
@ -632,6 +818,7 @@ namespace graphene { namespace app {
bool database_api::subscribe_to_objects( const std::function<void(const fc::variant&)>& callback, const vector<object_id_type>& ids)
{
FC_ASSERT( _subscriptions.size() < 1024 );
for(auto id : ids) _subscriptions[id] = callback;
return true;
}

View file

@ -318,12 +318,16 @@ namespace graphene { namespace app {
private:
/** called every time a block is applied to report the objects that were changed */
void on_objects_changed(const vector<object_id_type>& ids);
void on_objects_removed(const vector<const object*>& objs);
void on_applied_block();
fc::future<void> _broadcast_changes_complete;
fc::future<void> _broadcast_removed_complete;
boost::signals2::scoped_connection _change_connection;
boost::signals2::scoped_connection _removed_connection;
boost::signals2::scoped_connection _applied_block_connection;
map<object_id_type, std::function<void(const fc::variant&)> > _subscriptions;
map<account_id_type, std::function<void(const fc::variant&)> > _account_subscriptions;
map< pair<asset_id_type,asset_id_type>, std::function<void(const variant&)> > _market_subscriptions;
graphene::chain::database& _db;
};

View file

@ -443,6 +443,14 @@ void database::notify_changed_objects()
const auto& head_undo = _undo_db.head();
vector<object_id_type> changed_ids; changed_ids.reserve(head_undo.old_values.size());
for( const auto& item : head_undo.old_values ) changed_ids.push_back(item.first);
for( const auto& item : head_undo.new_ids ) changed_ids.push_back(item);
vector<const object*> removed;
removed.reserve( head_undo.removed.size() );
for( const auto& item : head_undo.removed )
{
changed_ids.push_back( item.first );
removed.emplace_back( item.second.get() );
}
changed_objects(changed_ids);
}

View file

@ -38,6 +38,8 @@ namespace graphene { namespace chain {
static const uint8_t space_id = implementation_ids;
static const uint8_t type_id = impl_account_statistics_object_type;
account_id_type owner;
/**
* Keep the most recent operation as a root pointer to a linked list of the transaction history. This field is
* not required by core validation and could in theory be made an annotation on the account object, but
@ -335,6 +337,7 @@ FC_REFLECT_DERIVED( graphene::chain::meta_account_object,
(memo_key)(committee_member_id) )
FC_REFLECT_DERIVED( graphene::chain::account_statistics_object, (graphene::chain::object),
(owner)
(most_recent_op)
(total_core_in_orders)
(lifetime_fees_paid)

View file

@ -198,6 +198,11 @@ namespace graphene { namespace chain {
*/
fc::signal<void(const vector<object_id_type>&)> changed_objects;
/** this signal is emitted any time an object is removed and contains a
* pointer to the last value of every object that was removed.
*/
fc::signal<void(const vector<const object*>&)> removed_objects;
//////////////////// db_witness_schedule.cpp ////////////////////
/**

View file

@ -100,6 +100,7 @@ namespace graphene { namespace chain {
}
void get_required_authorities( flat_set<account_id_type>& active, flat_set<account_id_type>& owner, vector<authority>& other )const;
void get_impacted_accounts( flat_set<account_id_type>& )const;
};
/**

View file

@ -133,10 +133,8 @@ namespace graphene { namespace chain {
impl_index_meta_object_type,
impl_asset_dynamic_data_type,
impl_asset_bitasset_data_type,
impl_committee_member_feeds_object_type,
impl_account_balance_object_type,
impl_account_statistics_object_type,
impl_account_debt_object_type,
impl_transaction_object_type,
impl_block_summary_object_type,
impl_account_transaction_history_object_type,
@ -193,7 +191,6 @@ namespace graphene { namespace chain {
class asset_bitasset_data_object;
class account_balance_object;
class account_statistics_object;
class account_debt_object;
class transaction_object;
class block_summary_object;
class account_transaction_history_object;
@ -204,7 +201,6 @@ namespace graphene { namespace chain {
typedef object_id< implementation_ids, impl_asset_bitasset_data_type, asset_bitasset_data_object> asset_bitasset_data_id_type;
typedef object_id< implementation_ids, impl_account_balance_object_type, account_balance_object> account_balance_id_type;
typedef object_id< implementation_ids, impl_account_statistics_object_type,account_statistics_object> account_statistics_id_type;
typedef object_id< implementation_ids, impl_account_debt_object_type, account_debt_object> account_debt_id_type;
typedef object_id< implementation_ids, impl_transaction_object_type, transaction_object> transaction_obj_id_type;
typedef object_id< implementation_ids, impl_block_summary_object_type, block_summary_object> block_summary_id_type;
@ -384,10 +380,8 @@ FC_REFLECT_ENUM( graphene::chain::impl_object_type,
(impl_index_meta_object_type)
(impl_asset_dynamic_data_type)
(impl_asset_bitasset_data_type)
(impl_committee_member_feeds_object_type)
(impl_account_balance_object_type)
(impl_account_statistics_object_type)
(impl_account_debt_object_type)
(impl_transaction_object_type)
(impl_block_summary_object_type)
(impl_account_transaction_history_object_type)
@ -417,7 +411,6 @@ FC_REFLECT_TYPENAME( graphene::chain::asset_dynamic_data_id_type )
FC_REFLECT_TYPENAME( graphene::chain::asset_bitasset_data_id_type )
FC_REFLECT_TYPENAME( graphene::chain::account_balance_id_type )
FC_REFLECT_TYPENAME( graphene::chain::account_statistics_id_type )
FC_REFLECT_TYPENAME( graphene::chain::account_debt_id_type )
FC_REFLECT_TYPENAME( graphene::chain::transaction_obj_id_type )
FC_REFLECT_TYPENAME( graphene::chain::block_summary_id_type )
FC_REFLECT_TYPENAME( graphene::chain::account_transaction_history_id_type )

View file

@ -133,6 +133,23 @@ struct required_owner_visitor
};
struct get_impacted_account_visitor
{
flat_set<account_id_type>& _impacted;
get_impacted_account_visitor( flat_set<account_id_type>& impact ):_impacted(impact) {}
typedef void result_type;
template<typename T>
void operator()( const T& o )const
{
o.get_impacted_accounts( _impacted );
}
};
void operation_get_impacted_accounts( const operation& op, flat_set<account_id_type>& result )
{
op.visit( get_impacted_account_visitor( result ) );
}
void operation_get_required_authorities( const operation& op, vector<authority>& result )
{
op.visit( required_auth_visitor( result ) );

View file

@ -275,4 +275,10 @@ void signed_transaction::verify_authority( const std::function<const authority*(
graphene::chain::verify_authority( operations, get_signature_keys(), get_active, get_owner, max_recursion );
} FC_CAPTURE_AND_RETHROW( (*this) ) }
void transaction::get_impacted_accounts( flat_set<account_id_type>& impacted ) const
{
for( const auto& op : operations )
operation_get_impacted_accounts( op, impacted );
}
} } // graphene::chain

View file

@ -68,12 +68,6 @@ struct operation_get_impacted_accounts
{}
typedef void result_type;
void add_authority( const authority& a )const
{
for( auto& item : a.account_auths )
_impacted.insert( item.first );
}
void operator()( const account_create_operation& o )const {
_impacted.insert( _op_history.result.get<object_id_type>() );
}