From 60b5228818c92f4d13b0a054956a5f834c7f7549 Mon Sep 17 00:00:00 2001 From: theoreticalbts Date: Mon, 5 Oct 2015 10:57:36 -0400 Subject: [PATCH 1/2] Implement remove.py --- programs/genesis_util/remove.py | 71 +++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100755 programs/genesis_util/remove.py diff --git a/programs/genesis_util/remove.py b/programs/genesis_util/remove.py new file mode 100755 index 00000000..ec464258 --- /dev/null +++ b/programs/genesis_util/remove.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 + +import argparse +import json +import sys + +def dump_json(obj, out, pretty): + if pretty: + json.dump(obj, out, indent=2, sort_keys=True) + else: + json.dump(obj, out, separators=(",", ":"), sort_keys=True) + return + +def main(): + parser = argparse.ArgumentParser(description="Remove entities from snapshot") + parser.add_argument("-o", "--output", metavar="OUT", default="-", help="output filename (default: stdout)") + parser.add_argument("-i", "--input", metavar="IN", default="-", help="input filename (default: stdin)") + parser.add_argument("-a", "--asset", metavar="ASSETS", nargs="+", help="list of asset(s) to delete") + parser.add_argument("-p", "--pretty", action="store_true", default=False, help="pretty print output") + opts = parser.parse_args() + + if opts.input == "-": + genesis = json.load(sys.stdin) + else: + with open(opts.input, "r") as f: + genesis = json.load(f) + + if opts.asset is None: + opts.asset = [] + rm_asset_set = set(opts.asset) + + removed_asset_entries = {aname : 0 for aname in opts.asset} + new_initial_assets = [] + for asset in genesis["initial_assets"]: + symbol = asset["symbol"] + if symbol not in rm_asset_set: + new_initial_assets.append(asset) + else: + removed_asset_entries[symbol] += 1 + genesis["initial_assets"] = new_initial_assets + + removed_balance_entries = {aname : [] for aname in opts.asset} + new_initial_balances = [] + for balance in genesis["initial_balances"]: + symbol = balance["asset_symbol"] + if symbol not in rm_asset_set: + new_initial_balances.append(balance) + else: + removed_balance_entries[symbol].append(balance) + genesis["initial_balances"] = new_initial_balances + # TODO: Remove from initial_vesting_balances + + for aname in opts.asset: + sys.stderr.write( + "Asset {sym} removed {acount} initial_assets, {bcount} initial_balances totaling {btotal}\n".format( + sym=aname, + acount=removed_asset_entries[aname], + bcount=len(removed_balance_entries[aname]), + btotal=sum(int(e["amount"]) for e in removed_balance_entries[aname]), + )) + + if opts.output == "-": + dump_json( genesis, sys.stdout, opts.pretty ) + sys.stdout.flush() + else: + with open(opts.output, "w") as f: + dump_json( genesis, f, opts.pretty ) + return + +if __name__ == "__main__": + main() From 36625805fba0fb88ad836fd8db368244e4232d62 Mon Sep 17 00:00:00 2001 From: theoreticalbts Date: Mon, 5 Oct 2015 14:25:35 -0400 Subject: [PATCH 2/2] delayed_node_plugin.cpp: Use new last_irreversible_block property, change to polling architecture --- .../delayed_node/delayed_node_plugin.cpp | 108 +++++++++++------- .../delayed_node/delayed_node_plugin.hpp | 3 +- 2 files changed, 66 insertions(+), 45 deletions(-) diff --git a/libraries/plugins/delayed_node/delayed_node_plugin.cpp b/libraries/plugins/delayed_node/delayed_node_plugin.cpp index 92079c9c..5e9e1bc8 100644 --- a/libraries/plugins/delayed_node/delayed_node_plugin.cpp +++ b/libraries/plugins/delayed_node/delayed_node_plugin.cpp @@ -17,8 +17,8 @@ */ #include +#include #include -#include #include #include @@ -33,12 +33,12 @@ namespace bpo = boost::program_options; namespace detail { struct delayed_node_plugin_impl { std::string remote_endpoint; - int delay_blocks; fc::http::websocket_client client; std::shared_ptr client_connection; fc::api database_api; boost::signals2::scoped_connection client_connection_closed; - bool currently_fetching = false; + graphene::chain::block_id_type last_received_remote_head; + graphene::chain::block_id_type last_processed_remote_head; }; } @@ -53,7 +53,6 @@ void delayed_node_plugin::plugin_set_program_options(bpo::options_description& c { cli.add_options() ("trusted-node", boost::program_options::value()->required(), "RPC endpoint of a trusted validating node (required)") - ("delay-block-count", boost::program_options::value()->required(), "Number of blocks to delay before advancing chain state (required)") ; cfg.add(cli); } @@ -70,58 +69,79 @@ void delayed_node_plugin::connect() void delayed_node_plugin::plugin_initialize(const boost::program_options::variables_map& options) { my->remote_endpoint = "ws://" + options.at("trusted-node").as(); - my->delay_blocks = options.at("delay-block-count").as(); } -void delayed_node_plugin::sync_with_trusted_node(uint32_t remote_head_block_num) +void delayed_node_plugin::sync_with_trusted_node() { - struct raii { - bool* target; - ~raii() { - *target = false; + auto& db = database(); + uint32_t synced_blocks = 0; + uint32_t pass_count = 0; + while( true ) + { + graphene::chain::dynamic_global_property_object remote_dpo = my->database_api->get_dynamic_global_properties(); + if( remote_dpo.last_irreversible_block_num <= db.head_block_num() ) + { + if( remote_dpo.last_irreversible_block_num < db.head_block_num() ) + { + wlog( "Trusted node seems to be behind delayed node" ); + } + if( synced_blocks > 1 ) + { + ilog( "Delayed node finished syncing ${n} blocks in ${k} passes", ("n", synced_blocks)("k", pass_count) ); + } + break; } - }; + pass_count++; + while( remote_dpo.last_irreversible_block_num > db.head_block_num() ) + { + fc::optional block = my->database_api->get_block( db.head_block_num()+1 ); + FC_ASSERT(block, "Trusted node claims it has blocks it doesn't actually have."); + ilog("Pushing block #${n}", ("n", block->block_num())); + db.push_block(*block); + synced_blocks++; + } + } +} - if (my->currently_fetching) return; - raii releaser{&my->currently_fetching}; - my->currently_fetching = true; +void delayed_node_plugin::mainloop() +{ + while( true ) + { + try + { + fc::usleep( fc::microseconds( 296645 ) ); // wake up a little over 3Hz - auto head_block = database().head_block_num(); - while (remote_head_block_num - head_block > my->delay_blocks) { - fc::optional block = my->database_api->get_block(++head_block); - FC_ASSERT(block, "Trusted node claims it has blocks it doesn't actually have."); - ilog("Pushing block #${n}", ("n", block->block_num())); - database().push_block(*block); + if( my->last_received_remote_head == my->last_processed_remote_head ) + continue; + + sync_with_trusted_node(); + my->last_processed_remote_head = my->last_received_remote_head; + } + catch( const fc::exception& e ) + { + elog("Error during connection: ${e}", ("e", e.to_detail_string())); + } } } void delayed_node_plugin::plugin_startup() { - try { + fc::async([this]() + { + mainloop(); + }); + + try + { connect(); - - my->database_api->set_subscribe_callback([this] (const fc::variant& v) { - auto& updates = v.get_array(); - for( const auto& v : updates ) - { - if( v.is_object() ) - { - auto& obj = v.get_object(); - if( obj["id"].as() == graphene::chain::dynamic_global_property_id_type() ) - { - auto props = v.as(); - sync_with_trusted_node(props.head_block_number); - } - } - } - }, true); - - // Go ahead and get in sync now, before subscribing - chain::dynamic_global_property_object props = my->database_api->get_dynamic_global_properties(); - sync_with_trusted_node(props.head_block_number); - + my->database_api->set_block_applied_callback([this]( const fc::variant& block_id ) + { + fc::from_variant( block_id, my->last_received_remote_head ); + } ); return; - } catch (const fc::exception& e) { + } + catch (const fc::exception& e) + { elog("Error during connection: ${e}", ("e", e.to_detail_string())); } fc::async([this]{connection_failed();}); @@ -130,7 +150,7 @@ void delayed_node_plugin::plugin_startup() void delayed_node_plugin::connection_failed() { elog("Connection to trusted node failed; retrying in 5 seconds..."); - fc::schedule([this]{plugin_startup();}, fc::time_point::now() + fc::seconds(5)); + fc::schedule([this]{connect();}, fc::time_point::now() + fc::seconds(5)); } } } diff --git a/libraries/plugins/delayed_node/include/graphene/delayed_node/delayed_node_plugin.hpp b/libraries/plugins/delayed_node/include/graphene/delayed_node/delayed_node_plugin.hpp index c25b1203..3865411e 100644 --- a/libraries/plugins/delayed_node/include/graphene/delayed_node/delayed_node_plugin.hpp +++ b/libraries/plugins/delayed_node/include/graphene/delayed_node/delayed_node_plugin.hpp @@ -34,11 +34,12 @@ public: boost::program_options::options_description& cfg) override; virtual void plugin_initialize(const boost::program_options::variables_map& options) override; virtual void plugin_startup() override; + void mainloop(); protected: void connection_failed(); void connect(); - void sync_with_trusted_node(uint32_t remote_head_block_num); + void sync_with_trusted_node(); }; } } //graphene::account_history