Merge branch 'witness_schedule_refactor'

Also fix object_id addition
This commit is contained in:
Daniel Larimer 2015-08-27 11:33:42 -04:00
commit 2a494d9de0
13 changed files with 152 additions and 236 deletions

View file

@ -4,6 +4,7 @@ add_subdirectory( deterministic_openssl_rand )
add_subdirectory( chain )
add_subdirectory( egenesis )
add_subdirectory( net )
#add_subdirectory( p2p )
add_subdirectory( time )
add_subdirectory( utilities )
add_subdirectory( app )

View file

@ -52,8 +52,45 @@ namespace graphene { namespace app {
elog("freeing database api ${x}", ("x",int64_t(this)) );
}
void database_api::set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter )
{
edump((clear_filter));
_subscribe_callback = cb;
if( clear_filter || !cb )
{
static fc::bloom_parameters param;
param.projected_element_count = 10000;
param.false_positive_probability = 1.0/10000;
param.maximum_size = 1024*8*8*2;
param.compute_optimal_parameters();
_subscribe_filter = fc::bloom_filter(param);
}
}
void database_api::subscribe_to_id( object_id_type id )const
{
idump((id));
if( _subscribe_callback )
_subscribe_filter.insert( (const unsigned char*)&id, sizeof(id) );
else
elog( "unable to subscribe to id because there is no subscribe callback set" );
}
fc::variants database_api::get_objects(const vector<object_id_type>& ids)const
{
if( _subscribe_callback ) {
for( auto id : ids )
{
if( id.type() == operation_history_object_type && id.space() == protocol_ids ) continue;
if( id.type() == impl_account_transaction_history_object_type && id.space() == implementation_ids ) continue;
subscribe_to_id( id );
}
}
else
{
elog( "getObjects without subscribe callback??" );
}
fc::variants result;
result.reserve(ids.size());
@ -150,7 +187,10 @@ namespace graphene { namespace app {
std::transform(account_ids.begin(), account_ids.end(), std::back_inserter(result),
[this](account_id_type id) -> optional<account_object> {
if(auto o = _db.find(id))
{
subscribe_to_id( id );
return *o;
}
return {};
});
return result;
@ -162,7 +202,10 @@ namespace graphene { namespace app {
std::transform(asset_ids.begin(), asset_ids.end(), std::back_inserter(result),
[this](asset_id_type id) -> optional<asset_object> {
if(auto o = _db.find(id))
{
subscribe_to_id( id );
return *o;
}
return {};
});
return result;
@ -182,35 +225,17 @@ namespace graphene { namespace app {
for( auto itr = accounts_by_name.lower_bound(lower_bound_name);
limit-- && itr != accounts_by_name.end();
++itr )
{
result.insert(make_pair(itr->name, itr->get_id()));
if( limit == 1 )
subscribe_to_id( itr->get_id() );
}
return result;
}
void database_api::unsubscribe_from_accounts( const vector<std::string>& names_or_ids )
{
for (const std::string& account_name_or_id : names_or_ids)
{
const account_object* account = nullptr;
if (std::isdigit(account_name_or_id[0]))
account = _db.find(fc::variant(account_name_or_id).as<account_id_type>());
else
{
const auto& idx = _db.get_index_type<account_index>().indices().get<by_name>();
auto itr = idx.find(account_name_or_id);
if (itr != idx.end())
account = &*itr;
}
if (account == nullptr)
continue;
_account_subscriptions.erase(account->id);
}
}
std::map<std::string, full_account> database_api::get_full_accounts(std::function<void(const variant&)> callback,
const vector<std::string>& names_or_ids, bool subscribe)
std::map<std::string, full_account> database_api::get_full_accounts( const vector<std::string>& names_or_ids, bool subscribe)
{
FC_ASSERT( _account_subscriptions.size() < 1024 );
std::map<std::string, full_account> results;
for (const std::string& account_name_or_id : names_or_ids)
@ -231,12 +256,7 @@ namespace graphene { namespace app {
if( subscribe )
{
ilog( "subscribe to ${id}", ("id",account->name) );
_account_subscriptions[account->id] = callback;
}
else
{
wlog( "unsubscribe to ${id}", ("id",account->name) );
_account_subscriptions.erase(account->id);
subscribe_to_id( account->id );
}
// fc::mutable_variant_object full_account;
@ -794,7 +814,7 @@ namespace graphene { namespace app {
/// we need to ensure the database_api is not deleted for the life of the async operation
auto capture_this = shared_from_this();
if( _account_subscriptions.size() )
if( _subscribe_callback )
{
map<account_id_type, vector<variant> > broadcast_queue;
for( const auto& obj : objs )
@ -802,24 +822,24 @@ namespace graphene { namespace app {
auto relevant = get_relevant_accounts( obj );
for( const auto& r : relevant )
{
auto sub = _account_subscriptions.find(r);
if( sub != _account_subscriptions.end() )
if( _subscribe_filter.contains(r) )
{
broadcast_queue[r].emplace_back(obj->to_variant());
break;
}
}
if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) )
broadcast_queue[account_id_type()].emplace_back(obj->to_variant());
}
if( broadcast_queue.size() )
{
fc::async([capture_this,broadcast_queue,this](){
for( const auto& item : broadcast_queue )
{
auto sub = _account_subscriptions.find(item.first);
if( sub != _account_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
_subscribe_callback( fc::variant(broadcast_queue) );
});
}
}
if( _market_subscriptions.size() )
{
map< pair<asset_id_type, asset_id_type>, vector<variant> > broadcast_queue;
@ -849,16 +869,14 @@ namespace graphene { namespace app {
void database_api::on_objects_changed(const vector<object_id_type>& ids)
{
vector<object_id_type> my_objects;
map<account_id_type, vector<variant> > broadcast_queue;
vector<variant> updates;
map< pair<asset_id_type, asset_id_type>, vector<variant> > market_broadcast_queue;
idump((ids));
for(auto id : ids)
{
if(_subscriptions.find(id) != _subscriptions.end())
my_objects.push_back(id);
const object* obj = nullptr;
if( _account_subscriptions.size() )
if( _subscribe_callback )
{
obj = _db.find_object( id );
if( obj )
@ -866,18 +884,26 @@ namespace graphene { namespace app {
vector<account_id_type> relevant = get_relevant_accounts( obj );
for( const auto& r : relevant )
{
auto sub = _account_subscriptions.find(r);
if( sub != _account_subscriptions.end() )
broadcast_queue[r].emplace_back(obj->to_variant());
if( _subscribe_filter.contains(r) )
{
updates.emplace_back(obj->to_variant());
break;
}
}
if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) )
updates.emplace_back(obj->to_variant());
}
else
elog( "unable to find object ${id}", ("id",id) );
{
if( _subscribe_filter.contains(id) )
updates.emplace_back(id); // send just the id to indicate removal
}
}
if( _market_subscriptions.size() )
{
if( !_account_subscriptions.size() ) obj = _db.find_object( id );
if( !_subscribe_callback )
obj = _db.find_object( id );
if( obj )
{
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
@ -896,42 +922,15 @@ namespace graphene { namespace app {
/// pushing the future back / popping the prior future if it is complete.
/// if a connection hangs then this could get backed up and result in
/// a failure to exit cleanly.
fc::async([capture_this,this,broadcast_queue,market_broadcast_queue,my_objects](){
for( const auto& item : broadcast_queue )
{
edump( (item) );
try {
auto sub = _account_subscriptions.find(item.first);
if( sub != _account_subscriptions.end() )
sub->second( fc::variant(item.second ) );
} catch ( const fc::exception& e )
{
edump((e.to_detail_string()));
}
}
fc::async([capture_this,this,updates,market_broadcast_queue](){
if( _subscribe_callback ) _subscribe_callback( updates );
for( const auto& item : market_broadcast_queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
for(auto id : my_objects)
{
// just incase _usbscriptions changed between filter and broadcast
auto itr = _subscriptions.find( id );
if( itr != _subscriptions.end() )
{
const object* obj = _db.find_object( id );
if( obj != nullptr )
{
itr->second(obj->to_variant());
}
else
{
itr->second(fc::variant(id));
}
}
}
});
}
@ -979,20 +978,6 @@ namespace graphene { namespace app {
});
}
vector<variant> database_api::subscribe_to_objects( const std::function<void(const fc::variant&)>& callback, const vector<object_id_type>& ids)
{
FC_ASSERT( _subscriptions.size() < 1024 );
for(auto id : ids) _subscriptions[id] = callback;
return get_objects( ids );
}
bool database_api::unsubscribe_from_objects(const vector<object_id_type>& ids)
{
for(auto id : ids) _subscriptions.erase(id);
return true;
}
void database_api::subscribe_to_market(std::function<void(const variant&)> callback, asset_id_type a, asset_id_type b)
{
if(a > b) std::swap(a,b);

View file

@ -40,6 +40,7 @@
#include <graphene/net/node.hpp>
#include <fc/api.hpp>
#include <fc/bloom_filter.hpp>
namespace graphene { namespace app {
using namespace graphene::chain;
@ -176,13 +177,8 @@ namespace graphene { namespace app {
* ignored. All other accounts will be retrieved and subscribed.
*
*/
std::map<string,full_account> get_full_accounts(std::function<void(const variant&)> callback,
const vector<string>& names_or_ids, bool subscribe );
std::map<string,full_account> get_full_accounts( const vector<string>& names_or_ids, bool subscribe );
/**
* Stop receiving updates generated by get_full_accounts()
*/
void unsubscribe_from_accounts( const vector<string>& names_or_ids );
/**
* @brief Get limit orders in a given market
@ -277,24 +273,6 @@ namespace graphene { namespace app {
*/
vector<optional<committee_member_object>> get_committee_members(const vector<committee_member_id_type>& committee_member_ids)const;
/**
* @group Push Notification Methods
* These methods may be used to get push notifications whenever an object or market is changed
* @{
*/
/**
* @brief Request notifications when some object(s) change
* @param callback Callback method which is called with the new version of a changed object
* @param ids The set of object IDs to watch
* @return get_objects(ids)
*/
vector<variant> subscribe_to_objects(const std::function<void(const fc::variant&)>& callback,
const vector<object_id_type>& ids);
/**
* @brief Stop receiving notifications for some object(s)
* @param ids The set of object IDs to stop watching
*/
bool unsubscribe_from_objects(const vector<object_id_type>& ids);
/**
* @brief Request notification when the active orders in the market between two assets changes
* @param callback Callback method which is called when the market changes
@ -318,7 +296,7 @@ namespace graphene { namespace app {
* This unsubscribes from all subscribed markets and objects.
*/
void cancel_all_subscriptions()
{ _subscriptions.clear(); _market_subscriptions.clear(); }
{ set_subscribe_callback( std::function<void(const fc::variant&)>(), true); _market_subscriptions.clear(); }
///@}
/// @brief Get a hexdump of the serialized binary form of a transaction
@ -377,18 +355,22 @@ namespace graphene { namespace app {
*/
vector<asset> get_required_fees( const vector<operation>& ops, asset_id_type id = asset_id_type() )const;
void set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter );
private:
void subscribe_to_id( object_id_type id )const;
/** called every time a block is applied to report the objects that were changed */
void on_objects_changed(const vector<object_id_type>& ids);
void on_objects_removed(const vector<const object*>& objs);
void on_applied_block();
mutable fc::bloom_filter _subscribe_filter;
std::function<void(const fc::variant&)> _subscribe_callback;
boost::signals2::scoped_connection _change_connection;
boost::signals2::scoped_connection _removed_connection;
boost::signals2::scoped_connection _applied_block_connection;
map<object_id_type, std::function<void(const fc::variant&)> > _subscriptions;
map<account_id_type, std::function<void(const fc::variant&)> > _account_subscriptions;
map< pair<asset_id_type,asset_id_type>, std::function<void(const variant&)> > _market_subscriptions;
map< pair<asset_id_type,asset_id_type>, std::function<void(const variant&)> > _market_subscriptions;
graphene::chain::database& _db;
};
@ -561,7 +543,6 @@ FC_API(graphene::app::database_api,
(get_account_count)
(lookup_accounts)
(get_full_accounts)
(unsubscribe_from_accounts)
(get_account_balances)
(get_named_account_balances)
(lookup_asset_symbols)
@ -577,8 +558,6 @@ FC_API(graphene::app::database_api,
(get_witness_count)
(lookup_witness_accounts)
(lookup_committee_member_accounts)
(subscribe_to_objects)
(unsubscribe_from_objects)
(subscribe_to_market)
(unsubscribe_from_market)
(cancel_all_subscriptions)
@ -594,6 +573,7 @@ FC_API(graphene::app::database_api,
(verify_authority)
(get_blinded_balances)
(get_required_fees)
(set_subscribe_callback)
)
FC_API(graphene::app::history_api,
(get_account_history)

View file

@ -44,7 +44,7 @@ void database::update_global_dynamic_data( const signed_block& b )
modify( _dgp, [&]( dynamic_global_property_object& dgp ){
if( BOOST_UNLIKELY( b.block_num() == 1 ) )
dgp.recently_missed_count = 0;
else if( _checkpoints.size() && _checkpoints.rbegin()->first >= b.block_num() )
else if( _checkpoints.size() && _checkpoints.rbegin()->first >= b.block_num() )
dgp.recently_missed_count = 0;
else if( missed_blocks )
dgp.recently_missed_count += GRAPHENE_RECENTLY_MISSED_COUNT_INCREMENT*missed_blocks;

View file

@ -45,12 +45,11 @@ witness_id_type database::get_scheduled_witness( uint32_t slot_num )const
const flat_set< witness_id_type >& active_witnesses = get_global_properties().active_witnesses;
uint32_t n = active_witnesses.size();
uint64_t min_witness_separation = (n / 2)+1;
uint64_t min_witness_separation = (n / 2); /// should work in cases where n is 0,1, and 2
uint64_t current_aslot = get_dynamic_global_properties().current_aslot + slot_num;
uint64_t start_of_current_round_aslot = current_aslot - (current_aslot % n);
uint64_t first_ineligible_aslot = std::min(
start_of_current_round_aslot, current_aslot - min_witness_separation );
uint64_t first_ineligible_aslot = std::min( start_of_current_round_aslot + 1, current_aslot - min_witness_separation );
//
// overflow analysis of above subtraction:
//
@ -76,7 +75,9 @@ witness_id_type database::get_scheduled_witness( uint32_t slot_num )const
if( wit.last_aslot >= first_ineligible_aslot )
continue;
uint64_t k = now_hi + uint64_t(wit_id);
/// High performance random generator
/// http://xorshift.di.unimi.it/
uint64_t k = now_hi + uint64_t(wit_id)*2685821657736338717ULL;
k ^= (k >> 12);
k ^= (k << 25);
k ^= (k >> 27);
@ -93,8 +94,19 @@ witness_id_type database::get_scheduled_witness( uint32_t slot_num )const
// at most K elements are susceptible to the filter,
// otherwise we have an inconsistent database (such as
// wit.last_aslot values that are non-unique or in the future)
if( !success ) {
edump((best_k)(slot_num)(first_ineligible_aslot)(current_aslot)(start_of_current_round_aslot)(min_witness_separation)(active_witnesses.size()));
for( const witness_id_type& wit_id : active_witnesses )
{
const witness_object& wit = wit_id(*this);
if( wit.last_aslot >= first_ineligible_aslot )
idump((wit_id)(wit.last_aslot));
}
assert( success );
}
assert( success );
return best_wit;
}

View file

@ -44,6 +44,7 @@ namespace graphene { namespace chain {
struct by_account;
struct by_vote_id;
struct by_last_block;
using witness_multi_index_type = multi_index_container<
witness_object,
indexed_by<

View file

@ -54,6 +54,12 @@ namespace graphene { namespace db {
object_id_type& operator++(int) { ++number; return *this; }
object_id_type& operator++() { ++number; return *this; }
friend object_id_type operator+(const object_id_type& a, int delta ) {
return object_id_type( a.space(), a.type(), a.instance() + delta );
}
friend object_id_type operator+(const object_id_type& a, int64_t delta ) {
return object_id_type( a.space(), a.type(), a.instance() + delta );
}
friend size_t hash_value( object_id_type v ) { return std::hash<uint64_t>()(v.number); }
friend bool operator < ( const object_id_type& a, const object_id_type& b )
@ -83,6 +89,10 @@ namespace graphene { namespace db {
object_id( object_id_type id ):instance(id.instance())
{
}
friend object_id operator+(const object_id a, int64_t delta ) { return object_id( uint64_t(a.instance.value+delta) ); }
friend object_id operator+(const object_id a, int delta ) { return object_id( uint64_t(a.instance.value+delta) ); }
operator object_id_type()const { return object_id_type( SpaceID, TypeID, instance.value ); }
operator uint64_t()const { return object_id_type( *this ).number; }

View file

@ -97,14 +97,26 @@ void delayed_node_plugin::plugin_startup()
try {
connect();
my->database_api->set_subscribe_callback([this] (const fc::variant& v) {
auto& updates = v.get_array();
for( const auto& v : updates )
{
if( v.is_object() )
{
auto& obj = v.get_object();
if( obj["id"].as<graphene::chain::object_id_type>() == graphene::chain::dynamic_global_property_id_type() )
{
auto props = v.as<graphene::chain::dynamic_global_property_object>();
sync_with_trusted_node(props.head_block_number);
}
}
}
}, true);
// Go ahead and get in sync now, before subscribing
chain::dynamic_global_property_object props = my->database_api->get_dynamic_global_properties();
sync_with_trusted_node(props.head_block_number);
my->database_api->subscribe_to_objects([this] (const fc::variant& v) {
auto props = v.as<graphene::chain::dynamic_global_property_object>();
sync_with_trusted_node(props.head_block_number);
}, {graphene::chain::dynamic_global_property_id_type()});
return;
} catch (const fc::exception& e) {
elog("Error during connection: ${e}", ("e", e.to_detail_string()));

View file

@ -375,10 +375,6 @@ public:
("chain_id", _chain_id) );
}
init_prototype_ops();
_remote_db->subscribe_to_objects( [=]( const fc::variant& obj )
{
fc::async([this]{resync();}, "Resync after block");
}, {dynamic_global_property_id_type()} );
_wallet.chain_id = _chain_id;
_wallet.ws_server = initial_data.ws_server;
_wallet.ws_user = initial_data.ws_user;
@ -629,10 +625,7 @@ public:
_keys[wif_pub_key] = wif_key;
if( _wallet.update_account(account) )
_remote_db->subscribe_to_objects([this](const fc::variant& v) {
_wallet.update_account(v.as<account_object>());
}, {account.id});
_wallet.update_account(account);
_wallet.extra_keys[account.id].insert(wif_pub_key);
@ -649,17 +642,11 @@ public:
if( ! fc::exists( wallet_filename ) )
return false;
if( !_wallet.my_accounts.empty() )
_remote_db->unsubscribe_from_objects(_wallet.my_account_ids());
_wallet = fc::json::from_file( wallet_filename ).as< wallet_data >();
if( _wallet.chain_id != _chain_id )
FC_THROW( "Wallet chain ID does not match",
("wallet.chain_id", _wallet.chain_id)
("chain_id", _chain_id) );
if( !_wallet.my_accounts.empty() )
_remote_db->subscribe_to_objects([this](const fc::variant& v) {
_wallet.update_account(v.as<account_object>());
}, _wallet.my_account_ids());
return true;
}
void save_wallet_file(string wallet_filename = "")

View file

@ -6,7 +6,7 @@ add_subdirectory( size_checker )
set(BUILD_QT_GUI FALSE CACHE BOOL "Build the Qt-based light client GUI")
if(BUILD_QT_GUI)
add_subdirectory(light_client)
# add_subdirectory(light_client)
endif()
set(BUILD_WEB_NODE FALSE CACHE BOOL "Build the Qt-based full node with web GUI")
if(BUILD_WEB_NODE)

View file

@ -24,6 +24,7 @@
#include <graphene/chain/market_evaluator.hpp>
#include <graphene/chain/account_object.hpp>
#include <graphene/chain/balance_object.hpp>
#include <graphene/chain/committee_member_object.hpp>
#include <fc/smart_ref_impl.hpp>
#include <iostream>

View file

@ -64,6 +64,20 @@ int main( int argc, char** argv )
{
graphene::chain::operation op;
vector<uint64_t> witnesses; witnesses.resize(50);
for( uint32_t i = 0; i < 60*60*24*30; ++i )
{
witnesses[ rand() % 50 ]++;
}
std::sort( witnesses.begin(), witnesses.end() );
idump((witnesses.back() - witnesses.front()) );
idump((60*60*24*30/50));
idump(("deviation: ")((60*60*24*30/50-witnesses.front())/(60*60*24*30/50.0)));
idump( (witnesses) );
for( int32_t i = 0; i < op.count(); ++i )
{
op.set_which(i);

View file

@ -313,93 +313,6 @@ BOOST_FIXTURE_TEST_CASE( witness_order_mc_test, database_fixture )
}
}
/**
* To have a secure random number we need to ensure that the same
* witness does not get to produce two blocks in a row. There is
* always a chance that the last witness of one round will be the
* first witness of the next round.
*
* This means that when we shuffle witness we need to make sure
* that there is at least N/2 witness between consecutive turns
* of the same witness. This means that durring the random
* shuffle we need to restrict the placement of witness to maintain
* this invariant.
*
* This test checks the requirement using Monte Carlo approach
* (produce lots of blocks and check the invariant holds).
*/
BOOST_FIXTURE_TEST_CASE( generic_scheduler_mc_test, database_fixture )
{
try {
size_t num_witnesses = db.get_global_properties().active_witnesses.size();
size_t dmin = num_witnesses >> 1;
witness_scheduler_rng rng(
// - - - - + - - - - 1 - - - - + - - - - 2 - - - - + - - -
"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"
);
witness_scheduler scheduler;
vector< witness_id_type > witness_ids;
witness_ids.reserve( num_witnesses );
for( size_t i=0; i<num_witnesses; i++ )
witness_ids.push_back( witness_id_type(i) );
scheduler._min_token_count = num_witnesses / 2;
scheduler.insert_all( witness_ids );
for( size_t i=0; i<num_witnesses; i++ )
scheduler.produce_schedule( rng );
vector< witness_id_type > cur_round;
vector< witness_id_type > full_schedule;
// if we make the maximum witness count testable,
// we'll need to enlarge this.
std::bitset< 0x40 > witness_seen;
size_t total_blocks = 1000000;
cur_round.reserve( num_witnesses );
full_schedule.reserve( total_blocks );
// we assert so the test doesn't continue, which would
// corrupt memory
assert( num_witnesses <= witness_seen.size() );
while( full_schedule.size() < total_blocks )
{
scheduler.produce_schedule( rng );
witness_id_type wid = scheduler.consume_schedule();
full_schedule.push_back( wid );
cur_round.push_back( wid );
if( cur_round.size() == num_witnesses )
{
// check that the current round contains exactly 1 copy
// of each witness
witness_seen.reset();
for( const witness_id_type& w : cur_round )
{
uint64_t inst = w.instance.value;
BOOST_CHECK( !witness_seen.test( inst ) );
assert( !witness_seen.test( inst ) );
witness_seen.set( inst );
}
cur_round.clear();
}
}
for( size_t i=0,m=full_schedule.size(); i<m; i++ )
{
for( size_t j=i+1,n=std::min( m, i+dmin ); j<n; j++ )
{
BOOST_CHECK( full_schedule[i] != full_schedule[j] );
assert( full_schedule[i] != full_schedule[j] );
}
}
} catch (fc::exception& e) {
edump((e.to_detail_string()));
throw;
}
}
BOOST_FIXTURE_TEST_CASE( tapos_rollover, database_fixture )
{