Merge pull request #1541 from oxarbitrage/es_objects_start_after_block
add es-objects-start-es-after-block option
This commit is contained in:
parent
837b665ef9
commit
f2da1f4a5b
2 changed files with 86 additions and 82 deletions
|
|
@ -59,7 +59,7 @@ class elasticsearch_plugin_impl
|
||||||
std::string _elasticsearch_basic_auth = "";
|
std::string _elasticsearch_basic_auth = "";
|
||||||
std::string _elasticsearch_index_prefix = "bitshares-";
|
std::string _elasticsearch_index_prefix = "bitshares-";
|
||||||
bool _elasticsearch_operation_object = false;
|
bool _elasticsearch_operation_object = false;
|
||||||
uint32_t _elasticsearch_start_es_after_block = 0; // disabled
|
uint32_t _elasticsearch_start_es_after_block = 0;
|
||||||
CURL *curl; // curl handler
|
CURL *curl; // curl handler
|
||||||
vector <string> bulk_lines; // vector of op lines
|
vector <string> bulk_lines; // vector of op lines
|
||||||
vector<std::string> prepare;
|
vector<std::string> prepare;
|
||||||
|
|
@ -254,7 +254,7 @@ bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account
|
||||||
const auto &stats_obj = getStatsObject(account_id);
|
const auto &stats_obj = getStatsObject(account_id);
|
||||||
const auto &ath = addNewEntry(stats_obj, account_id, oho);
|
const auto &ath = addNewEntry(stats_obj, account_id, oho);
|
||||||
growStats(stats_obj, ath);
|
growStats(stats_obj, ath);
|
||||||
if(_elasticsearch_start_es_after_block == 0 || block_number > _elasticsearch_start_es_after_block) {
|
if(block_number > _elasticsearch_start_es_after_block) {
|
||||||
createBulkLine(ath);
|
createBulkLine(ath);
|
||||||
prepareBulk(ath.id);
|
prepareBulk(ath.id);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -63,6 +63,7 @@ class es_objects_plugin_impl
|
||||||
bool _es_objects_limit_orders = true;
|
bool _es_objects_limit_orders = true;
|
||||||
bool _es_objects_asset_bitasset = true;
|
bool _es_objects_asset_bitasset = true;
|
||||||
std::string _es_objects_index_prefix = "objects-";
|
std::string _es_objects_index_prefix = "objects-";
|
||||||
|
uint32_t _es_objects_start_es_after_block = 0;
|
||||||
CURL *curl; // curl handler
|
CURL *curl; // curl handler
|
||||||
vector <std::string> bulk;
|
vector <std::string> bulk;
|
||||||
vector<std::string> prepare;
|
vector<std::string> prepare;
|
||||||
|
|
@ -84,88 +85,87 @@ bool es_objects_plugin_impl::index_database( const vector<object_id_type>& ids,
|
||||||
block_time = db.head_block_time();
|
block_time = db.head_block_time();
|
||||||
block_number = db.head_block_num();
|
block_number = db.head_block_num();
|
||||||
|
|
||||||
// check if we are in replay or in sync and change number of bulk documents accordingly
|
if(block_number > _es_objects_start_es_after_block) {
|
||||||
uint32_t limit_documents = 0;
|
|
||||||
if((fc::time_point::now() - block_time) < fc::seconds(30))
|
|
||||||
limit_documents = _es_objects_bulk_sync;
|
|
||||||
else
|
|
||||||
limit_documents = _es_objects_bulk_replay;
|
|
||||||
|
|
||||||
for(auto const& value: ids) {
|
// check if we are in replay or in sync and change number of bulk documents accordingly
|
||||||
if(value.is<proposal_object>() && _es_objects_proposals) {
|
uint32_t limit_documents = 0;
|
||||||
auto obj = db.find_object(value);
|
if ((fc::time_point::now() - block_time) < fc::seconds(30))
|
||||||
auto p = static_cast<const proposal_object*>(obj);
|
limit_documents = _es_objects_bulk_sync;
|
||||||
if(p != nullptr) {
|
|
||||||
if(action == "delete")
|
|
||||||
remove_from_database(p->id, "proposal");
|
|
||||||
else
|
|
||||||
prepareTemplate<proposal_object>(*p, "proposal");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if(value.is<account_object>() && _es_objects_accounts) {
|
|
||||||
auto obj = db.find_object(value);
|
|
||||||
auto a = static_cast<const account_object*>(obj);
|
|
||||||
if(a != nullptr) {
|
|
||||||
if(action == "delete")
|
|
||||||
remove_from_database(a->id, "account");
|
|
||||||
else
|
|
||||||
prepareTemplate<account_object>(*a, "account");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if(value.is<asset_object>() && _es_objects_assets) {
|
|
||||||
auto obj = db.find_object(value);
|
|
||||||
auto a = static_cast<const asset_object*>(obj);
|
|
||||||
if(a != nullptr) {
|
|
||||||
if(action == "delete")
|
|
||||||
remove_from_database(a->id, "asset");
|
|
||||||
else
|
|
||||||
prepareTemplate<asset_object>(*a, "asset");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if(value.is<account_balance_object>() && _es_objects_balances) {
|
|
||||||
auto obj = db.find_object(value);
|
|
||||||
auto b = static_cast<const account_balance_object*>(obj);
|
|
||||||
if(b != nullptr) {
|
|
||||||
if(action == "delete")
|
|
||||||
remove_from_database(b->id, "balance");
|
|
||||||
else
|
|
||||||
prepareTemplate<account_balance_object>(*b, "balance");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if(value.is<limit_order_object>() && _es_objects_limit_orders) {
|
|
||||||
auto obj = db.find_object(value);
|
|
||||||
auto l = static_cast<const limit_order_object*>(obj);
|
|
||||||
if(l != nullptr) {
|
|
||||||
if(action == "delete")
|
|
||||||
remove_from_database(l->id, "limitorder");
|
|
||||||
else
|
|
||||||
prepareTemplate<limit_order_object>(*l, "limitorder");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if(value.is<asset_bitasset_data_object>() && _es_objects_asset_bitasset) {
|
|
||||||
auto obj = db.find_object(value);
|
|
||||||
auto ba = static_cast<const asset_bitasset_data_object*>(obj);
|
|
||||||
if(ba != nullptr) {
|
|
||||||
if(action == "delete")
|
|
||||||
remove_from_database(ba->id, "bitasset");
|
|
||||||
else
|
|
||||||
prepareTemplate<asset_bitasset_data_object>(*ba, "bitasset");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech
|
|
||||||
|
|
||||||
graphene::utilities::ES es;
|
|
||||||
es.curl = curl;
|
|
||||||
es.bulk_lines = bulk;
|
|
||||||
es.elasticsearch_url = _es_objects_elasticsearch_url;
|
|
||||||
es.auth = _es_objects_auth;
|
|
||||||
|
|
||||||
if(!graphene::utilities::SendBulk(es))
|
|
||||||
return false;
|
|
||||||
else
|
else
|
||||||
bulk.clear();
|
limit_documents = _es_objects_bulk_replay;
|
||||||
|
|
||||||
|
|
||||||
|
for (auto const &value: ids) {
|
||||||
|
if (value.is<proposal_object>() && _es_objects_proposals) {
|
||||||
|
auto obj = db.find_object(value);
|
||||||
|
auto p = static_cast<const proposal_object *>(obj);
|
||||||
|
if (p != nullptr) {
|
||||||
|
if (action == "delete")
|
||||||
|
remove_from_database(p->id, "proposal");
|
||||||
|
else
|
||||||
|
prepareTemplate<proposal_object>(*p, "proposal");
|
||||||
|
}
|
||||||
|
} else if (value.is<account_object>() && _es_objects_accounts) {
|
||||||
|
auto obj = db.find_object(value);
|
||||||
|
auto a = static_cast<const account_object *>(obj);
|
||||||
|
if (a != nullptr) {
|
||||||
|
if (action == "delete")
|
||||||
|
remove_from_database(a->id, "account");
|
||||||
|
else
|
||||||
|
prepareTemplate<account_object>(*a, "account");
|
||||||
|
}
|
||||||
|
} else if (value.is<asset_object>() && _es_objects_assets) {
|
||||||
|
auto obj = db.find_object(value);
|
||||||
|
auto a = static_cast<const asset_object *>(obj);
|
||||||
|
if (a != nullptr) {
|
||||||
|
if (action == "delete")
|
||||||
|
remove_from_database(a->id, "asset");
|
||||||
|
else
|
||||||
|
prepareTemplate<asset_object>(*a, "asset");
|
||||||
|
}
|
||||||
|
} else if (value.is<account_balance_object>() && _es_objects_balances) {
|
||||||
|
auto obj = db.find_object(value);
|
||||||
|
auto b = static_cast<const account_balance_object *>(obj);
|
||||||
|
if (b != nullptr) {
|
||||||
|
if (action == "delete")
|
||||||
|
remove_from_database(b->id, "balance");
|
||||||
|
else
|
||||||
|
prepareTemplate<account_balance_object>(*b, "balance");
|
||||||
|
}
|
||||||
|
} else if (value.is<limit_order_object>() && _es_objects_limit_orders) {
|
||||||
|
auto obj = db.find_object(value);
|
||||||
|
auto l = static_cast<const limit_order_object *>(obj);
|
||||||
|
if (l != nullptr) {
|
||||||
|
if (action == "delete")
|
||||||
|
remove_from_database(l->id, "limitorder");
|
||||||
|
else
|
||||||
|
prepareTemplate<limit_order_object>(*l, "limitorder");
|
||||||
|
}
|
||||||
|
} else if (value.is<asset_bitasset_data_object>() && _es_objects_asset_bitasset) {
|
||||||
|
auto obj = db.find_object(value);
|
||||||
|
auto ba = static_cast<const asset_bitasset_data_object *>(obj);
|
||||||
|
if (ba != nullptr) {
|
||||||
|
if (action == "delete")
|
||||||
|
remove_from_database(ba->id, "bitasset");
|
||||||
|
else
|
||||||
|
prepareTemplate<asset_bitasset_data_object>(*ba, "bitasset");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech
|
||||||
|
|
||||||
|
graphene::utilities::ES es;
|
||||||
|
es.curl = curl;
|
||||||
|
es.bulk_lines = bulk;
|
||||||
|
es.elasticsearch_url = _es_objects_elasticsearch_url;
|
||||||
|
es.auth = _es_objects_auth;
|
||||||
|
|
||||||
|
if (!graphene::utilities::SendBulk(es))
|
||||||
|
return false;
|
||||||
|
else
|
||||||
|
bulk.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -257,6 +257,7 @@ void es_objects_plugin::plugin_set_program_options(
|
||||||
("es-objects-asset-bitasset", boost::program_options::value<bool>(), "Store feed data(true)")
|
("es-objects-asset-bitasset", boost::program_options::value<bool>(), "Store feed data(true)")
|
||||||
("es-objects-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(objects-)")
|
("es-objects-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(objects-)")
|
||||||
("es-objects-keep-only-current", boost::program_options::value<bool>(), "Keep only current state of the objects(true)")
|
("es-objects-keep-only-current", boost::program_options::value<bool>(), "Keep only current state of the objects(true)")
|
||||||
|
("es-objects-start-es-after-block", boost::program_options::value<uint32_t>(), "Start doing ES job after block(0)")
|
||||||
;
|
;
|
||||||
cfg.add(cli);
|
cfg.add(cli);
|
||||||
}
|
}
|
||||||
|
|
@ -319,6 +320,9 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable
|
||||||
if (options.count("es-objects-keep-only-current")) {
|
if (options.count("es-objects-keep-only-current")) {
|
||||||
my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as<bool>();
|
my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as<bool>();
|
||||||
}
|
}
|
||||||
|
if (options.count("es-objects-start-es-after-block")) {
|
||||||
|
my->_es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as<uint32_t>();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void es_objects_plugin::plugin_startup()
|
void es_objects_plugin::plugin_startup()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue