diff --git a/libraries/plugins/delayed_node/delayed_node_plugin.cpp b/libraries/plugins/delayed_node/delayed_node_plugin.cpp index 92079c9c..5e9e1bc8 100644 --- a/libraries/plugins/delayed_node/delayed_node_plugin.cpp +++ b/libraries/plugins/delayed_node/delayed_node_plugin.cpp @@ -17,8 +17,8 @@ */ #include +#include #include -#include #include #include @@ -33,12 +33,12 @@ namespace bpo = boost::program_options; namespace detail { struct delayed_node_plugin_impl { std::string remote_endpoint; - int delay_blocks; fc::http::websocket_client client; std::shared_ptr client_connection; fc::api database_api; boost::signals2::scoped_connection client_connection_closed; - bool currently_fetching = false; + graphene::chain::block_id_type last_received_remote_head; + graphene::chain::block_id_type last_processed_remote_head; }; } @@ -53,7 +53,6 @@ void delayed_node_plugin::plugin_set_program_options(bpo::options_description& c { cli.add_options() ("trusted-node", boost::program_options::value()->required(), "RPC endpoint of a trusted validating node (required)") - ("delay-block-count", boost::program_options::value()->required(), "Number of blocks to delay before advancing chain state (required)") ; cfg.add(cli); } @@ -70,58 +69,79 @@ void delayed_node_plugin::connect() void delayed_node_plugin::plugin_initialize(const boost::program_options::variables_map& options) { my->remote_endpoint = "ws://" + options.at("trusted-node").as(); - my->delay_blocks = options.at("delay-block-count").as(); } -void delayed_node_plugin::sync_with_trusted_node(uint32_t remote_head_block_num) +void delayed_node_plugin::sync_with_trusted_node() { - struct raii { - bool* target; - ~raii() { - *target = false; + auto& db = database(); + uint32_t synced_blocks = 0; + uint32_t pass_count = 0; + while( true ) + { + graphene::chain::dynamic_global_property_object remote_dpo = my->database_api->get_dynamic_global_properties(); + if( remote_dpo.last_irreversible_block_num <= db.head_block_num() ) + { + if( remote_dpo.last_irreversible_block_num < db.head_block_num() ) + { + wlog( "Trusted node seems to be behind delayed node" ); + } + if( synced_blocks > 1 ) + { + ilog( "Delayed node finished syncing ${n} blocks in ${k} passes", ("n", synced_blocks)("k", pass_count) ); + } + break; } - }; + pass_count++; + while( remote_dpo.last_irreversible_block_num > db.head_block_num() ) + { + fc::optional block = my->database_api->get_block( db.head_block_num()+1 ); + FC_ASSERT(block, "Trusted node claims it has blocks it doesn't actually have."); + ilog("Pushing block #${n}", ("n", block->block_num())); + db.push_block(*block); + synced_blocks++; + } + } +} - if (my->currently_fetching) return; - raii releaser{&my->currently_fetching}; - my->currently_fetching = true; +void delayed_node_plugin::mainloop() +{ + while( true ) + { + try + { + fc::usleep( fc::microseconds( 296645 ) ); // wake up a little over 3Hz - auto head_block = database().head_block_num(); - while (remote_head_block_num - head_block > my->delay_blocks) { - fc::optional block = my->database_api->get_block(++head_block); - FC_ASSERT(block, "Trusted node claims it has blocks it doesn't actually have."); - ilog("Pushing block #${n}", ("n", block->block_num())); - database().push_block(*block); + if( my->last_received_remote_head == my->last_processed_remote_head ) + continue; + + sync_with_trusted_node(); + my->last_processed_remote_head = my->last_received_remote_head; + } + catch( const fc::exception& e ) + { + elog("Error during connection: ${e}", ("e", e.to_detail_string())); + } } } void delayed_node_plugin::plugin_startup() { - try { + fc::async([this]() + { + mainloop(); + }); + + try + { connect(); - - my->database_api->set_subscribe_callback([this] (const fc::variant& v) { - auto& updates = v.get_array(); - for( const auto& v : updates ) - { - if( v.is_object() ) - { - auto& obj = v.get_object(); - if( obj["id"].as() == graphene::chain::dynamic_global_property_id_type() ) - { - auto props = v.as(); - sync_with_trusted_node(props.head_block_number); - } - } - } - }, true); - - // Go ahead and get in sync now, before subscribing - chain::dynamic_global_property_object props = my->database_api->get_dynamic_global_properties(); - sync_with_trusted_node(props.head_block_number); - + my->database_api->set_block_applied_callback([this]( const fc::variant& block_id ) + { + fc::from_variant( block_id, my->last_received_remote_head ); + } ); return; - } catch (const fc::exception& e) { + } + catch (const fc::exception& e) + { elog("Error during connection: ${e}", ("e", e.to_detail_string())); } fc::async([this]{connection_failed();}); @@ -130,7 +150,7 @@ void delayed_node_plugin::plugin_startup() void delayed_node_plugin::connection_failed() { elog("Connection to trusted node failed; retrying in 5 seconds..."); - fc::schedule([this]{plugin_startup();}, fc::time_point::now() + fc::seconds(5)); + fc::schedule([this]{connect();}, fc::time_point::now() + fc::seconds(5)); } } } diff --git a/libraries/plugins/delayed_node/include/graphene/delayed_node/delayed_node_plugin.hpp b/libraries/plugins/delayed_node/include/graphene/delayed_node/delayed_node_plugin.hpp index c25b1203..3865411e 100644 --- a/libraries/plugins/delayed_node/include/graphene/delayed_node/delayed_node_plugin.hpp +++ b/libraries/plugins/delayed_node/include/graphene/delayed_node/delayed_node_plugin.hpp @@ -34,11 +34,12 @@ public: boost::program_options::options_description& cfg) override; virtual void plugin_initialize(const boost::program_options::variables_map& options) override; virtual void plugin_startup() override; + void mainloop(); protected: void connection_failed(); void connect(); - void sync_with_trusted_node(uint32_t remote_head_block_num); + void sync_with_trusted_node(); }; } } //graphene::account_history