diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index b69ff64a..23cb31aa 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -58,6 +58,8 @@ class elasticsearch_plugin_impl bool _elasticsearch_visitor = false; std::string _elasticsearch_basic_auth = ""; std::string _elasticsearch_index_prefix = "bitshares-"; + bool _elasticsearch_operation_object = false; + uint32_t _elasticsearch_start_es_after_block = 0; // disabled CURL *curl; // curl handler vector bulk_lines; // vector of op lines vector prepare; @@ -73,7 +75,7 @@ class elasticsearch_plugin_impl std::string index_name; bool is_sync = false; private: - bool add_elasticsearch( const account_id_type account_id, const optional& oho, const signed_block& b ); + bool add_elasticsearch( const account_id_type account_id, const optional& oho, const uint32_t block_number ); const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj, const account_id_type account_id, const optional & oho); @@ -162,7 +164,7 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b for( auto& account_id : impacted ) { - if(!add_elasticsearch( account_id, oho, b )) + if(!add_elasticsearch( account_id, oho, b.block_num() )) return false; } } @@ -247,13 +249,15 @@ void elasticsearch_plugin_impl::doVisitor(const optional & oho, - const signed_block& b) + const uint32_t block_number) { const auto &stats_obj = getStatsObject(account_id); const auto &ath = addNewEntry(stats_obj, account_id, oho); growStats(stats_obj, ath); - createBulkLine(ath); - prepareBulk(ath.id); + if(_elasticsearch_start_es_after_block == 0 || block_number > _elasticsearch_start_es_after_block) { + createBulkLine(ath); + prepareBulk(ath.id); + } cleanObjects(ath, account_id); if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech @@ -392,6 +396,8 @@ void elasticsearch_plugin::plugin_set_program_options( ("elasticsearch-visitor", boost::program_options::value(), "Use visitor to index additional data(slows down the replay)") ("elasticsearch-basic-auth", boost::program_options::value(), "Pass basic auth to elasticsearch database ") ("elasticsearch-index-prefix", boost::program_options::value(), "Add a prefix to the index(bitshares-)") + ("elasticsearch-operation-object", boost::program_options::value(), "Save operation as object(false)") + ("elasticsearch-start-es-after-block", boost::program_options::value(), "Start doing ES job after block(0)") ; cfg.add(cli); } @@ -399,11 +405,10 @@ void elasticsearch_plugin::plugin_set_program_options( void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options) { database().applied_block.connect( [&]( const signed_block& b) { - if(!my->update_account_histories(b)) - { + if (!my->update_account_histories(b)) FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); - } } ); + my->_oho_index = database().add_index< primary_index< operation_history_index > >(); database().add_index< primary_index< account_transaction_history_index > >(); @@ -425,6 +430,12 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia if (options.count("elasticsearch-index-prefix")) { my->_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as(); } + if (options.count("elasticsearch-operation-object")) { + my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as(); + } + if (options.count("elasticsearch-start-es-after-block")) { + my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as(); + } } void elasticsearch_plugin::plugin_startup()