From f2da1f4a5bc227cff305270df95476e79692a847 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Wed, 30 Jan 2019 11:46:17 -0300 Subject: [PATCH] Merge pull request #1541 from oxarbitrage/es_objects_start_after_block add es-objects-start-es-after-block option --- .../elasticsearch/elasticsearch_plugin.cpp | 4 +- libraries/plugins/es_objects/es_objects.cpp | 164 +++++++++--------- 2 files changed, 86 insertions(+), 82 deletions(-) diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index 23cb31aa..1ca911f9 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -59,7 +59,7 @@ class elasticsearch_plugin_impl std::string _elasticsearch_basic_auth = ""; std::string _elasticsearch_index_prefix = "bitshares-"; bool _elasticsearch_operation_object = false; - uint32_t _elasticsearch_start_es_after_block = 0; // disabled + uint32_t _elasticsearch_start_es_after_block = 0; CURL *curl; // curl handler vector bulk_lines; // vector of op lines vector prepare; @@ -254,7 +254,7 @@ bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account const auto &stats_obj = getStatsObject(account_id); const auto &ath = addNewEntry(stats_obj, account_id, oho); growStats(stats_obj, ath); - if(_elasticsearch_start_es_after_block == 0 || block_number > _elasticsearch_start_es_after_block) { + if(block_number > _elasticsearch_start_es_after_block) { createBulkLine(ath); prepareBulk(ath.id); } diff --git a/libraries/plugins/es_objects/es_objects.cpp b/libraries/plugins/es_objects/es_objects.cpp index 9896772d..58517349 100644 --- a/libraries/plugins/es_objects/es_objects.cpp +++ b/libraries/plugins/es_objects/es_objects.cpp @@ -63,6 +63,7 @@ class es_objects_plugin_impl bool _es_objects_limit_orders = true; bool _es_objects_asset_bitasset = true; std::string _es_objects_index_prefix = "objects-"; + uint32_t _es_objects_start_es_after_block = 0; CURL *curl; // curl handler vector bulk; vector prepare; @@ -84,88 +85,87 @@ bool es_objects_plugin_impl::index_database( const vector& ids, block_time = db.head_block_time(); block_number = db.head_block_num(); - // check if we are in replay or in sync and change number of bulk documents accordingly - uint32_t limit_documents = 0; - if((fc::time_point::now() - block_time) < fc::seconds(30)) - limit_documents = _es_objects_bulk_sync; - else - limit_documents = _es_objects_bulk_replay; + if(block_number > _es_objects_start_es_after_block) { - for(auto const& value: ids) { - if(value.is() && _es_objects_proposals) { - auto obj = db.find_object(value); - auto p = static_cast(obj); - if(p != nullptr) { - if(action == "delete") - remove_from_database(p->id, "proposal"); - else - prepareTemplate(*p, "proposal"); - } - } - else if(value.is() && _es_objects_accounts) { - auto obj = db.find_object(value); - auto a = static_cast(obj); - if(a != nullptr) { - if(action == "delete") - remove_from_database(a->id, "account"); - else - prepareTemplate(*a, "account"); - } - } - else if(value.is() && _es_objects_assets) { - auto obj = db.find_object(value); - auto a = static_cast(obj); - if(a != nullptr) { - if(action == "delete") - remove_from_database(a->id, "asset"); - else - prepareTemplate(*a, "asset"); - } - } - else if(value.is() && _es_objects_balances) { - auto obj = db.find_object(value); - auto b = static_cast(obj); - if(b != nullptr) { - if(action == "delete") - remove_from_database(b->id, "balance"); - else - prepareTemplate(*b, "balance"); - } - } - else if(value.is() && _es_objects_limit_orders) { - auto obj = db.find_object(value); - auto l = static_cast(obj); - if(l != nullptr) { - if(action == "delete") - remove_from_database(l->id, "limitorder"); - else - prepareTemplate(*l, "limitorder"); - } - } - else if(value.is() && _es_objects_asset_bitasset) { - auto obj = db.find_object(value); - auto ba = static_cast(obj); - if(ba != nullptr) { - if(action == "delete") - remove_from_database(ba->id, "bitasset"); - else - prepareTemplate(*ba, "bitasset"); - } - } - } - - if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech - - graphene::utilities::ES es; - es.curl = curl; - es.bulk_lines = bulk; - es.elasticsearch_url = _es_objects_elasticsearch_url; - es.auth = _es_objects_auth; - - if(!graphene::utilities::SendBulk(es)) - return false; + // check if we are in replay or in sync and change number of bulk documents accordingly + uint32_t limit_documents = 0; + if ((fc::time_point::now() - block_time) < fc::seconds(30)) + limit_documents = _es_objects_bulk_sync; else - bulk.clear(); + limit_documents = _es_objects_bulk_replay; + + + for (auto const &value: ids) { + if (value.is() && _es_objects_proposals) { + auto obj = db.find_object(value); + auto p = static_cast(obj); + if (p != nullptr) { + if (action == "delete") + remove_from_database(p->id, "proposal"); + else + prepareTemplate(*p, "proposal"); + } + } else if (value.is() && _es_objects_accounts) { + auto obj = db.find_object(value); + auto a = static_cast(obj); + if (a != nullptr) { + if (action == "delete") + remove_from_database(a->id, "account"); + else + prepareTemplate(*a, "account"); + } + } else if (value.is() && _es_objects_assets) { + auto obj = db.find_object(value); + auto a = static_cast(obj); + if (a != nullptr) { + if (action == "delete") + remove_from_database(a->id, "asset"); + else + prepareTemplate(*a, "asset"); + } + } else if (value.is() && _es_objects_balances) { + auto obj = db.find_object(value); + auto b = static_cast(obj); + if (b != nullptr) { + if (action == "delete") + remove_from_database(b->id, "balance"); + else + prepareTemplate(*b, "balance"); + } + } else if (value.is() && _es_objects_limit_orders) { + auto obj = db.find_object(value); + auto l = static_cast(obj); + if (l != nullptr) { + if (action == "delete") + remove_from_database(l->id, "limitorder"); + else + prepareTemplate(*l, "limitorder"); + } + } else if (value.is() && _es_objects_asset_bitasset) { + auto obj = db.find_object(value); + auto ba = static_cast(obj); + if (ba != nullptr) { + if (action == "delete") + remove_from_database(ba->id, "bitasset"); + else + prepareTemplate(*ba, "bitasset"); + } + } + } + + if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech + + graphene::utilities::ES es; + es.curl = curl; + es.bulk_lines = bulk; + es.elasticsearch_url = _es_objects_elasticsearch_url; + es.auth = _es_objects_auth; + + if (!graphene::utilities::SendBulk(es)) + return false; + else + bulk.clear(); + } } return true; @@ -257,6 +257,7 @@ void es_objects_plugin::plugin_set_program_options( ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data(true)") ("es-objects-index-prefix", boost::program_options::value(), "Add a prefix to the index(objects-)") ("es-objects-keep-only-current", boost::program_options::value(), "Keep only current state of the objects(true)") + ("es-objects-start-es-after-block", boost::program_options::value(), "Start doing ES job after block(0)") ; cfg.add(cli); } @@ -319,6 +320,9 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable if (options.count("es-objects-keep-only-current")) { my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as(); } + if (options.count("es-objects-start-es-after-block")) { + my->_es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as(); + } } void es_objects_plugin::plugin_startup()