update subscribe callback

This commit is contained in:
Daniel Larimer 2015-08-26 18:01:48 -04:00
parent a79eff2761
commit e5106c15a3
5 changed files with 100 additions and 143 deletions

View file

@ -52,8 +52,44 @@ namespace graphene { namespace app {
elog("freeing database api ${x}", ("x",int64_t(this)) );
}
void database_api::set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter )
{
edump((clear_filter));
_subscribe_callback = cb;
if( clear_filter || !cb )
{
static fc::bloom_parameters param;
param.projected_element_count = 10000;
param.false_positive_probability = 1.0/10000;
param.maximum_size = 1024*8*8*2;
_subscribe_filter = fc::bloom_filter(param);
}
}
void database_api::subscribe_to_id( object_id_type id )const
{
idump((id));
if( _subscribe_callback )
_subscribe_filter.insert( (const unsigned char*)&id, sizeof(id) );
else
elog( "unable to subscribe to id because there is no subscribe callback set" );
}
fc::variants database_api::get_objects(const vector<object_id_type>& ids)const
{
if( _subscribe_callback ) {
for( auto id : ids )
{
if( id.type() == operation_history_object_type && id.space() == protocol_ids ) continue;
if( id.type() == impl_account_transaction_history_object_type && id.space() == implementation_ids ) continue;
subscribe_to_id( id );
}
}
else
{
elog( "getObjects without subscribe callback??" );
}
fc::variants result;
result.reserve(ids.size());
@ -150,7 +186,10 @@ namespace graphene { namespace app {
std::transform(account_ids.begin(), account_ids.end(), std::back_inserter(result),
[this](account_id_type id) -> optional<account_object> {
if(auto o = _db.find(id))
{
subscribe_to_id( id );
return *o;
}
return {};
});
return result;
@ -162,7 +201,10 @@ namespace graphene { namespace app {
std::transform(asset_ids.begin(), asset_ids.end(), std::back_inserter(result),
[this](asset_id_type id) -> optional<asset_object> {
if(auto o = _db.find(id))
{
subscribe_to_id( id );
return *o;
}
return {};
});
return result;
@ -182,35 +224,17 @@ namespace graphene { namespace app {
for( auto itr = accounts_by_name.lower_bound(lower_bound_name);
limit-- && itr != accounts_by_name.end();
++itr )
{
result.insert(make_pair(itr->name, itr->get_id()));
if( limit == 1 )
subscribe_to_id( itr->get_id() );
}
return result;
}
void database_api::unsubscribe_from_accounts( const vector<std::string>& names_or_ids )
{
for (const std::string& account_name_or_id : names_or_ids)
{
const account_object* account = nullptr;
if (std::isdigit(account_name_or_id[0]))
account = _db.find(fc::variant(account_name_or_id).as<account_id_type>());
else
{
const auto& idx = _db.get_index_type<account_index>().indices().get<by_name>();
auto itr = idx.find(account_name_or_id);
if (itr != idx.end())
account = &*itr;
}
if (account == nullptr)
continue;
_account_subscriptions.erase(account->id);
}
}
std::map<std::string, full_account> database_api::get_full_accounts(std::function<void(const variant&)> callback,
const vector<std::string>& names_or_ids, bool subscribe)
std::map<std::string, full_account> database_api::get_full_accounts( const vector<std::string>& names_or_ids, bool subscribe)
{
FC_ASSERT( _account_subscriptions.size() < 1024 );
std::map<std::string, full_account> results;
for (const std::string& account_name_or_id : names_or_ids)
@ -231,12 +255,7 @@ namespace graphene { namespace app {
if( subscribe )
{
ilog( "subscribe to ${id}", ("id",account->name) );
_account_subscriptions[account->id] = callback;
}
else
{
wlog( "unsubscribe to ${id}", ("id",account->name) );
_account_subscriptions.erase(account->id);
subscribe_to_id( account->id );
}
// fc::mutable_variant_object full_account;
@ -795,7 +814,7 @@ namespace graphene { namespace app {
/// we need to ensure the database_api is not deleted for the life of the async operation
auto capture_this = shared_from_this();
if( _account_subscriptions.size() )
if( _subscribe_callback )
{
map<account_id_type, vector<variant> > broadcast_queue;
for( const auto& obj : objs )
@ -803,24 +822,21 @@ namespace graphene { namespace app {
auto relevant = get_relevant_accounts( obj );
for( const auto& r : relevant )
{
auto sub = _account_subscriptions.find(r);
if( sub != _account_subscriptions.end() )
if( _subscribe_filter.contains(r) )
broadcast_queue[r].emplace_back(obj->to_variant());
}
if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) )
broadcast_queue[account_id_type()].emplace_back(obj->to_variant());
}
if( broadcast_queue.size() )
{
fc::async([capture_this,broadcast_queue,this](){
for( const auto& item : broadcast_queue )
{
auto sub = _account_subscriptions.find(item.first);
if( sub != _account_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
_subscribe_callback( fc::variant(broadcast_queue) );
});
}
}
if( _market_subscriptions.size() )
{
map< pair<asset_id_type, asset_id_type>, vector<variant> > broadcast_queue;
@ -850,16 +866,14 @@ namespace graphene { namespace app {
void database_api::on_objects_changed(const vector<object_id_type>& ids)
{
vector<object_id_type> my_objects;
map<account_id_type, vector<variant> > broadcast_queue;
vector<variant> updates;
map< pair<asset_id_type, asset_id_type>, vector<variant> > market_broadcast_queue;
idump((ids));
for(auto id : ids)
{
if(_subscriptions.find(id) != _subscriptions.end())
my_objects.push_back(id);
const object* obj = nullptr;
if( _account_subscriptions.size() )
if( _subscribe_callback )
{
obj = _db.find_object( id );
if( obj )
@ -867,18 +881,23 @@ namespace graphene { namespace app {
vector<account_id_type> relevant = get_relevant_accounts( obj );
for( const auto& r : relevant )
{
auto sub = _account_subscriptions.find(r);
if( sub != _account_subscriptions.end() )
broadcast_queue[r].emplace_back(obj->to_variant());
if( _subscribe_filter.contains(r) )
updates.emplace_back(obj->to_variant());
}
if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) )
updates.emplace_back(obj->to_variant());
}
else
elog( "unable to find object ${id}", ("id",id) );
{
if( _subscribe_filter.contains(id) )
updates.emplace_back(id); // send just the id to indicate removal
}
}
if( _market_subscriptions.size() )
{
if( !_account_subscriptions.size() ) obj = _db.find_object( id );
if( !_subscribe_callback )
obj = _db.find_object( id );
if( obj )
{
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
@ -897,42 +916,15 @@ namespace graphene { namespace app {
/// pushing the future back / popping the prior future if it is complete.
/// if a connection hangs then this could get backed up and result in
/// a failure to exit cleanly.
fc::async([capture_this,this,broadcast_queue,market_broadcast_queue,my_objects](){
for( const auto& item : broadcast_queue )
{
edump( (item) );
try {
auto sub = _account_subscriptions.find(item.first);
if( sub != _account_subscriptions.end() )
sub->second( fc::variant(item.second ) );
} catch ( const fc::exception& e )
{
edump((e.to_detail_string()));
}
}
fc::async([capture_this,this,updates,market_broadcast_queue](){
if( _subscribe_callback ) _subscribe_callback( updates );
for( const auto& item : market_broadcast_queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
for(auto id : my_objects)
{
// just incase _usbscriptions changed between filter and broadcast
auto itr = _subscriptions.find( id );
if( itr != _subscriptions.end() )
{
const object* obj = _db.find_object( id );
if( obj != nullptr )
{
itr->second(obj->to_variant());
}
else
{
itr->second(fc::variant(id));
}
}
}
});
}
@ -980,20 +972,6 @@ namespace graphene { namespace app {
});
}
vector<variant> database_api::subscribe_to_objects( const std::function<void(const fc::variant&)>& callback, const vector<object_id_type>& ids)
{
FC_ASSERT( _subscriptions.size() < 1024 );
for(auto id : ids) _subscriptions[id] = callback;
return get_objects( ids );
}
bool database_api::unsubscribe_from_objects(const vector<object_id_type>& ids)
{
for(auto id : ids) _subscriptions.erase(id);
return true;
}
void database_api::subscribe_to_market(std::function<void(const variant&)> callback, asset_id_type a, asset_id_type b)
{
if(a > b) std::swap(a,b);

View file

@ -40,6 +40,7 @@
#include <graphene/net/node.hpp>
#include <fc/api.hpp>
#include <fc/bloom_filter.hpp>
namespace graphene { namespace app {
using namespace graphene::chain;
@ -176,13 +177,8 @@ namespace graphene { namespace app {
* ignored. All other accounts will be retrieved and subscribed.
*
*/
std::map<string,full_account> get_full_accounts(std::function<void(const variant&)> callback,
const vector<string>& names_or_ids, bool subscribe );
std::map<string,full_account> get_full_accounts( const vector<string>& names_or_ids, bool subscribe );
/**
* Stop receiving updates generated by get_full_accounts()
*/
void unsubscribe_from_accounts( const vector<string>& names_or_ids );
/**
* @brief Get limit orders in a given market
@ -277,24 +273,6 @@ namespace graphene { namespace app {
*/
vector<optional<committee_member_object>> get_committee_members(const vector<committee_member_id_type>& committee_member_ids)const;
/**
* @group Push Notification Methods
* These methods may be used to get push notifications whenever an object or market is changed
* @{
*/
/**
* @brief Request notifications when some object(s) change
* @param callback Callback method which is called with the new version of a changed object
* @param ids The set of object IDs to watch
* @return get_objects(ids)
*/
vector<variant> subscribe_to_objects(const std::function<void(const fc::variant&)>& callback,
const vector<object_id_type>& ids);
/**
* @brief Stop receiving notifications for some object(s)
* @param ids The set of object IDs to stop watching
*/
bool unsubscribe_from_objects(const vector<object_id_type>& ids);
/**
* @brief Request notification when the active orders in the market between two assets changes
* @param callback Callback method which is called when the market changes
@ -318,7 +296,7 @@ namespace graphene { namespace app {
* This unsubscribes from all subscribed markets and objects.
*/
void cancel_all_subscriptions()
{ _subscriptions.clear(); _market_subscriptions.clear(); }
{ set_subscribe_callback( std::function<void(const fc::variant&)>(), true); _market_subscriptions.clear(); }
///@}
/// @brief Get a hexdump of the serialized binary form of a transaction
@ -377,18 +355,22 @@ namespace graphene { namespace app {
*/
vector<asset> get_required_fees( const vector<operation>& ops, asset_id_type id = asset_id_type() )const;
void set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter );
private:
void subscribe_to_id( object_id_type id )const;
/** called every time a block is applied to report the objects that were changed */
void on_objects_changed(const vector<object_id_type>& ids);
void on_objects_removed(const vector<const object*>& objs);
void on_applied_block();
mutable fc::bloom_filter _subscribe_filter;
std::function<void(const fc::variant&)> _subscribe_callback;
boost::signals2::scoped_connection _change_connection;
boost::signals2::scoped_connection _removed_connection;
boost::signals2::scoped_connection _applied_block_connection;
map<object_id_type, std::function<void(const fc::variant&)> > _subscriptions;
map<account_id_type, std::function<void(const fc::variant&)> > _account_subscriptions;
map< pair<asset_id_type,asset_id_type>, std::function<void(const variant&)> > _market_subscriptions;
map< pair<asset_id_type,asset_id_type>, std::function<void(const variant&)> > _market_subscriptions;
graphene::chain::database& _db;
};
@ -561,7 +543,6 @@ FC_API(graphene::app::database_api,
(get_account_count)
(lookup_accounts)
(get_full_accounts)
(unsubscribe_from_accounts)
(get_account_balances)
(get_named_account_balances)
(lookup_asset_symbols)
@ -577,8 +558,6 @@ FC_API(graphene::app::database_api,
(get_witness_count)
(lookup_witness_accounts)
(lookup_committee_member_accounts)
(subscribe_to_objects)
(unsubscribe_from_objects)
(subscribe_to_market)
(unsubscribe_from_market)
(cancel_all_subscriptions)
@ -594,6 +573,7 @@ FC_API(graphene::app::database_api,
(verify_authority)
(get_blinded_balances)
(get_required_fees)
(set_subscribe_callback)
)
FC_API(graphene::app::history_api,
(get_account_history)

View file

@ -97,14 +97,26 @@ void delayed_node_plugin::plugin_startup()
try {
connect();
my->database_api->set_subscribe_callback([this] (const fc::variant& v) {
auto& updates = v.get_array();
for( const auto& v : updates )
{
if( v.is_object() )
{
auto& obj = v.get_object();
if( obj["id"].as<graphene::chain::object_id_type>() == graphene::chain::dynamic_global_property_id_type() )
{
auto props = v.as<graphene::chain::dynamic_global_property_object>();
sync_with_trusted_node(props.head_block_number);
}
}
}
}, true);
// Go ahead and get in sync now, before subscribing
chain::dynamic_global_property_object props = my->database_api->get_dynamic_global_properties();
sync_with_trusted_node(props.head_block_number);
my->database_api->subscribe_to_objects([this] (const fc::variant& v) {
auto props = v.as<graphene::chain::dynamic_global_property_object>();
sync_with_trusted_node(props.head_block_number);
}, {graphene::chain::dynamic_global_property_id_type()});
return;
} catch (const fc::exception& e) {
elog("Error during connection: ${e}", ("e", e.to_detail_string()));

View file

@ -375,10 +375,6 @@ public:
("chain_id", _chain_id) );
}
init_prototype_ops();
_remote_db->subscribe_to_objects( [=]( const fc::variant& obj )
{
fc::async([this]{resync();}, "Resync after block");
}, {dynamic_global_property_id_type()} );
_wallet.chain_id = _chain_id;
_wallet.ws_server = initial_data.ws_server;
_wallet.ws_user = initial_data.ws_user;
@ -629,10 +625,7 @@ public:
_keys[wif_pub_key] = wif_key;
if( _wallet.update_account(account) )
_remote_db->subscribe_to_objects([this](const fc::variant& v) {
_wallet.update_account(v.as<account_object>());
}, {account.id});
_wallet.update_account(account);
_wallet.extra_keys[account.id].insert(wif_pub_key);
@ -649,17 +642,11 @@ public:
if( ! fc::exists( wallet_filename ) )
return false;
if( !_wallet.my_accounts.empty() )
_remote_db->unsubscribe_from_objects(_wallet.my_account_ids());
_wallet = fc::json::from_file( wallet_filename ).as< wallet_data >();
if( _wallet.chain_id != _chain_id )
FC_THROW( "Wallet chain ID does not match",
("wallet.chain_id", _wallet.chain_id)
("chain_id", _chain_id) );
if( !_wallet.my_accounts.empty() )
_remote_db->subscribe_to_objects([this](const fc::variant& v) {
_wallet.update_account(v.as<account_object>());
}, _wallet.my_account_ids());
return true;
}
void save_wallet_file(string wallet_filename = "")

View file

@ -6,7 +6,7 @@ add_subdirectory( size_checker )
set(BUILD_QT_GUI FALSE CACHE BOOL "Build the Qt-based light client GUI")
if(BUILD_QT_GUI)
add_subdirectory(light_client)
# add_subdirectory(light_client)
endif()
set(BUILD_WEB_NODE FALSE CACHE BOOL "Build the Qt-based full node with web GUI")
if(BUILD_WEB_NODE)