Update delayed node feature

This commit is contained in:
serkixenos 2022-09-07 13:57:17 +00:00
parent 9268c31ac4
commit 6a38fb2382
7 changed files with 30 additions and 345 deletions

View file

@ -917,6 +917,7 @@ void application::initialize(const fc::path &data_dir, const boost::program_opti
wanted.insert("accounts_list");
wanted.insert("affiliate_stats");
}
if (!wanted.count("delayed_node") && !wanted.count("witness")) // explicitly requested delayed_node functionality suppresses witness functions
wanted.insert("witness");
wanted.insert("bookie");
@ -949,7 +950,7 @@ void application::startup() {
}
std::shared_ptr<abstract_plugin> application::get_plugin(const string &name) const {
return my->_active_plugins[name];
return is_plugin_enabled(name) ? my->_active_plugins[name] : nullptr;
}
bool application::is_plugin_enabled(const string &name) const {

View file

@ -63,8 +63,24 @@ void delayed_node_plugin::plugin_set_program_options(bpo::options_description& c
void delayed_node_plugin::connect()
{
my->client_connection = std::make_shared<fc::rpc::websocket_api_connection>(my->client.connect(my->remote_endpoint), GRAPHENE_MAX_NESTED_OBJECTS);
fc::http::websocket_connection_ptr con;
try
{
con = my->client.connect(my->remote_endpoint);
}
catch( const fc::exception& e )
{
wlog("Error while connecting: ${e}", ("e", e.to_detail_string()));
connection_failed();
return;
}
my->client_connection = std::make_shared<fc::rpc::websocket_api_connection>(
con, GRAPHENE_NET_MAX_NESTED_OBJECTS );
my->database_api = my->client_connection->get_remote_api<graphene::app::database_api>(0);
my->database_api->set_block_applied_callback([this]( const fc::variant& block_id )
{
fc::from_variant( block_id, my->last_received_remote_head, GRAPHENE_MAX_NESTED_OBJECTS );
} );
my->client_connection_closed = my->client_connection->closed.connect([this] {
connection_failed();
});
@ -73,7 +89,9 @@ void delayed_node_plugin::connect()
void delayed_node_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
FC_ASSERT(options.count("trusted-node") > 0);
ilog("delayed_node_plugin: plugin_initialize() begin");
my->remote_endpoint = "ws://" + options.at("trusted-node").as<std::string>();
ilog("delayed_node_plugin: plugin_initialize() end");
}
void delayed_node_plugin::sync_with_trusted_node()
@ -100,8 +118,11 @@ void delayed_node_plugin::sync_with_trusted_node()
while( remote_dpo.last_irreversible_block_num > db.head_block_num() )
{
fc::optional<graphene::chain::signed_block> block = my->database_api->get_block( db.head_block_num()+1 );
// TODO: during sync, decouple requesting blocks from preprocessing + applying them
FC_ASSERT(block, "Trusted node claims it has blocks it doesn't actually have.");
ilog("Pushing block #${n}", ("n", block->block_num()));
// timur: failed to merge from bitshares, API n/a in peerplays
// db.precompute_parallel( *block, graphene::chain::database::skip_nothing ).wait();
db.push_block(*block);
synced_blocks++;
}
@ -136,24 +157,12 @@ void delayed_node_plugin::plugin_startup()
mainloop();
});
try
{
connect();
my->database_api->set_block_applied_callback([this]( const fc::variant& block_id )
{
fc::from_variant( block_id, my->last_received_remote_head, GRAPHENE_MAX_NESTED_OBJECTS );
} );
return;
}
catch (const fc::exception& e)
{
elog("Error during connection: ${e}", ("e", e.to_detail_string()));
}
fc::async([this]{connection_failed();});
}
void delayed_node_plugin::connection_failed()
{
my->last_received_remote_head = my->last_processed_remote_head;
elog("Connection to trusted node failed; retrying in 5 seconds...");
fc::schedule([this]{connect();}, fc::time_point::now() + fc::seconds(5));
}

View file

@ -4,7 +4,6 @@ if( BUILD_PEERPLAYS_PROGRAMS )
add_subdirectory( genesis_util )
add_subdirectory( witness_node )
add_subdirectory( debug_node )
add_subdirectory( delayed_node )
add_subdirectory( js_operation_serializer )
add_subdirectory( size_checker )
endif( BUILD_PEERPLAYS_PROGRAMS )

View file

@ -1,21 +0,0 @@
add_executable( delayed_node main.cpp )
if( UNIX AND NOT APPLE )
set(rt_library rt )
endif()
find_package( Gperftools QUIET )
if( GPERFTOOLS_FOUND )
message( STATUS "Found gperftools; compiling delayed_node with TCMalloc")
list( APPEND PLATFORM_SPECIFIC_LIBS tcmalloc )
endif()
target_link_libraries( delayed_node
PRIVATE graphene_app graphene_egenesis_full graphene_delayed_node ${PLATFORM_SPECIFIC_LIBS} )
install( TARGETS
delayed_node
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
)

View file

@ -1,305 +0,0 @@
/*
* Copyright (c) 2015 Cryptonomex, Inc., and contributors.
*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <graphene/app/application.hpp>
#include <graphene/delayed_node/delayed_node_plugin.hpp>
#include <graphene/account_history/account_history_plugin.hpp>
#include <graphene/market_history/market_history_plugin.hpp>
#include <fc/exception/exception.hpp>
#include <fc/thread/thread.hpp>
#include <fc/interprocess/signals.hpp>
#include <fc/log/console_appender.hpp>
#include <fc/log/file_appender.hpp>
#include <fc/log/logger.hpp>
#include <fc/log/logger_config.hpp>
#include <boost/filesystem.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <iostream>
#include <fstream>
#ifdef WIN32
# include <signal.h>
#else
# include <csignal>
#endif
using namespace graphene;
namespace bpo = boost::program_options;
void write_default_logging_config_to_stream(std::ostream& out);
fc::optional<fc::logging_config> load_logging_config_from_ini_file(const fc::path& config_ini_filename);
int main(int argc, char** argv) {
try {
app::application node;
bpo::options_description app_options("Graphene Delayed Node");
bpo::options_description cfg_options("Graphene Delayed Node");
app_options.add_options()
("help,h", "Print this help message and exit.")
("data-dir,d", bpo::value<boost::filesystem::path>()->default_value("delayed_node_data_dir"), "Directory containing databases, configuration file, etc.")
;
bpo::variables_map options;
bpo::options_description cli, cfg;
node.set_program_options(cli, cfg);
cfg_options.add(cfg);
cfg_options.add_options()
("plugins", bpo::value<std::string>()->default_value("delayed_node account_history market_history"),
"Space-separated list of plugins to activate");
auto delayed_plug = node.register_plugin<delayed_node::delayed_node_plugin>();
auto history_plug = node.register_plugin<account_history::account_history_plugin>();
auto market_history_plug = node.register_plugin<market_history::market_history_plugin>();
// add plugin options to config
try
{
bpo::options_description cli, cfg;
node.set_program_options(cli, cfg);
app_options.add(cli);
cfg_options.add(cfg);
bpo::store(bpo::parse_command_line(argc, argv, app_options), options);
}
catch (const boost::program_options::error& e)
{
std::cerr << "Error parsing command line: " << e.what() << "\n";
return 1;
}
if( options.count("help") )
{
std::cout << app_options << "\n";
return 0;
}
fc::path data_dir;
if( options.count("data-dir") )
{
data_dir = options["data-dir"].as<boost::filesystem::path>();
if( data_dir.is_relative() )
data_dir = fc::current_path() / data_dir;
}
fc::path config_ini_path = data_dir / "config.ini";
// Create config file if not already present
if( !fc::exists(config_ini_path) )
{
ilog("Writing new config file at ${path}", ("path", config_ini_path));
if( !fc::exists(data_dir) )
fc::create_directories(data_dir);
std::ofstream out_cfg(config_ini_path.preferred_string());
for( const boost::shared_ptr<bpo::option_description> od : cfg_options.options() )
{
if( !od->description().empty() )
out_cfg << "# " << od->description() << "\n";
boost::any store;
if( !od->semantic()->apply_default(store) )
out_cfg << "# " << od->long_name() << " = \n";
else {
auto example = od->format_parameter();
if( example.empty() )
// This is a boolean switch
out_cfg << od->long_name() << " = " << "false\n";
else {
// The string is formatted "arg (=<interesting part>)"
example.erase(0, 6);
example.erase(example.length()-1);
out_cfg << od->long_name() << " = " << example << "\n";
}
}
out_cfg << "\n";
}
write_default_logging_config_to_stream(out_cfg);
out_cfg.close();
// read the default logging config we just wrote out to the file and start using it
fc::optional<fc::logging_config> logging_config = load_logging_config_from_ini_file(config_ini_path);
if (logging_config)
fc::configure_logging(*logging_config);
}
// Parse configuration file
try {
bpo::store(bpo::parse_config_file<char>(config_ini_path.preferred_string().c_str(), cfg_options, true), options);
// try to get logging options from the config file.
try
{
fc::optional<fc::logging_config> logging_config = load_logging_config_from_ini_file(config_ini_path);
if (logging_config)
fc::configure_logging(*logging_config);
}
catch (const fc::exception&)
{
wlog("Error parsing logging config from config file ${config}, using default config", ("config", config_ini_path.preferred_string()));
}
bpo::notify(options);
} catch( const boost::program_options::error& e ) {
elog("Error parsing configuration file: ${e}", ("e", e.what()));
return 1;
}
if( !options.count("plugins") )
options.insert( std::make_pair( "plugins", bpo::variable_value(std::string("delayed_node account_history market_history"), true) ) );
node.initialize(data_dir, options);
node.initialize_plugins( options );
node.startup();
node.startup_plugins();
fc::promise<int>::ptr exit_promise = new fc::promise<int>("UNIX Signal Handler");
fc::set_signal_handler([&exit_promise](int signal) {
exit_promise->set_value(signal);
}, SIGINT);
ilog("Started delayed node on a chain with ${h} blocks.", ("h", node.chain_database()->head_block_num()));
ilog("Chain ID is ${id}", ("id", node.chain_database()->get_chain_id()) );
int signal = exit_promise->wait();
ilog("Exiting from signal ${n}", ("n", signal));
node.shutdown_plugins();
return 0;
} catch( const fc::exception& e ) {
elog("Exiting with error:\n${e}", ("e", e.to_detail_string()));
return 1;
}
}
// logging config is too complicated to be parsed by boost::program_options,
// so we do it by hand
//
// Currently, you can only specify the filenames and logging levels, which
// are all most users would want to change. At a later time, options can
// be added to control rotation intervals, compression, and other seldom-
// used features
void write_default_logging_config_to_stream(std::ostream& out)
{
out << "# declare an appender named \"stderr\" that writes messages to the console\n"
"[log.console_appender.stderr]\n"
"stream=std_error\n\n"
"# declare an appender named \"p2p\" that writes messages to p2p.log\n"
"[log.file_appender.p2p]\n"
"filename=logs/p2p/p2p.log\n"
"# filename can be absolute or relative to this config file\n\n"
"# route any messages logged to the default logger to the \"stderr\" logger we\n"
"# declared above, if they are info level are higher\n"
"[logger.default]\n"
"level=info\n"
"appenders=stderr\n\n"
"# route messages sent to the \"p2p\" logger to the p2p appender declared above\n"
"[logger.p2p]\n"
"level=info\n"
"appenders=p2p\n\n";
}
fc::optional<fc::logging_config> load_logging_config_from_ini_file(const fc::path& config_ini_filename)
{
try
{
fc::logging_config logging_config;
bool found_logging_config = false;
boost::property_tree::ptree config_ini_tree;
boost::property_tree::ini_parser::read_ini(config_ini_filename.preferred_string().c_str(), config_ini_tree);
for (const auto& section : config_ini_tree)
{
const std::string& section_name = section.first;
const boost::property_tree::ptree& section_tree = section.second;
const std::string console_appender_section_prefix = "log.console_appender.";
const std::string file_appender_section_prefix = "log.file_appender.";
const std::string logger_section_prefix = "logger.";
if (boost::starts_with(section_name, console_appender_section_prefix))
{
std::string console_appender_name = section_name.substr(console_appender_section_prefix.length());
std::string stream_name = section_tree.get<std::string>("stream");
// construct a default console appender config here
// stdout/stderr will be taken from ini file, everything else hard-coded here
fc::console_appender::config console_appender_config;
console_appender_config.level_colors.emplace_back(
fc::console_appender::level_color(fc::log_level::debug,
fc::console_appender::color::green));
console_appender_config.level_colors.emplace_back(
fc::console_appender::level_color(fc::log_level::warn,
fc::console_appender::color::brown));
console_appender_config.level_colors.emplace_back(
fc::console_appender::level_color(fc::log_level::error,
fc::console_appender::color::cyan));
console_appender_config.stream = fc::variant(stream_name, 1).as<fc::console_appender::stream::type>(1);
logging_config.appenders.push_back(fc::appender_config(console_appender_name, "console", fc::variant(console_appender_config, GRAPHENE_MAX_NESTED_OBJECTS)));
found_logging_config = true;
}
else if (boost::starts_with(section_name, file_appender_section_prefix))
{
std::string file_appender_name = section_name.substr(file_appender_section_prefix.length());
fc::path file_name = section_tree.get<std::string>("filename");
if (file_name.is_relative())
file_name = fc::absolute(config_ini_filename).parent_path() / file_name;
// construct a default file appender config here
// filename will be taken from ini file, everything else hard-coded here
fc::file_appender::config file_appender_config;
file_appender_config.filename = file_name;
file_appender_config.flush = true;
file_appender_config.rotate = true;
file_appender_config.rotation_interval = fc::hours(1);
file_appender_config.rotation_limit = fc::days(1);
logging_config.appenders.push_back(fc::appender_config(file_appender_name, "file", fc::variant(file_appender_config, GRAPHENE_MAX_NESTED_OBJECTS)));
found_logging_config = true;
}
else if (boost::starts_with(section_name, logger_section_prefix))
{
std::string logger_name = section_name.substr(logger_section_prefix.length());
std::string level_string = section_tree.get<std::string>("level");
std::string appenders_string = section_tree.get<std::string>("appenders");
fc::logger_config logger_config(logger_name);
logger_config.level = fc::variant(level_string, 1).as<fc::log_level>(1);
boost::split(logger_config.appenders, appenders_string,
boost::is_any_of(" ,"),
boost::token_compress_on);
logging_config.loggers.push_back(logger_config);
found_logging_config = true;
}
}
if (found_logging_config)
return logging_config;
else
return fc::optional<fc::logging_config>();
}
FC_RETHROW_EXCEPTIONS(warn, "")
}

View file

@ -11,7 +11,7 @@ endif()
# We have to link against graphene_debug_witness because deficiency in our API infrastructure doesn't allow plugins to be fully abstracted #246
target_link_libraries( witness_node
PRIVATE graphene_app graphene_egenesis_full graphene_snapshot graphene_witness peerplays_sidechain ${PLATFORM_SPECIFIC_LIBS} )
PRIVATE graphene_app graphene_egenesis_full graphene_snapshot graphene_delayed_node graphene_witness peerplays_sidechain ${PLATFORM_SPECIFIC_LIBS} )
# also add dependencies to graphene_generate_genesis graphene_generate_uia_sharedrop_genesis if you want those plugins
install( TARGETS

View file

@ -36,6 +36,7 @@
#include <graphene/affiliate_stats/affiliate_stats_plugin.hpp>
#include <graphene/bookie/bookie_plugin.hpp>
#include <graphene/peerplays_sidechain/peerplays_sidechain_plugin.hpp>
#include <graphene/delayed_node/delayed_node_plugin.hpp>
#include <graphene/utilities/git_revision.hpp>
#include <graphene/snapshot/snapshot.hpp>
@ -90,6 +91,7 @@ int main(int argc, char** argv) {
auto bookie_plug = node->register_plugin<bookie::bookie_plugin>();
auto peerplays_sidechain = node->register_plugin<peerplays_sidechain::peerplays_sidechain_plugin>();
auto snapshot_plug = node->register_plugin<snapshot_plugin::snapshot_plugin>();
auto delayed_plug = node->register_plugin<delayed_node::delayed_node_plugin>();
// add plugin options to config
try