From 2d19aa3de10ae145d9d4644379ff2502a91c6cc0 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Mon, 15 Oct 2018 14:42:45 -0300 Subject: [PATCH] Merge pull request #1271 from oxarbitrage/es_objects refine es_objects plugin --- libraries/plugins/es_objects/es_objects.cpp | 271 ++++++++++-------- .../graphene/es_objects/es_objects.hpp | 48 +++- 2 files changed, 183 insertions(+), 136 deletions(-) diff --git a/libraries/plugins/es_objects/es_objects.cpp b/libraries/plugins/es_objects/es_objects.cpp index 5f04e40c..82e6ff23 100644 --- a/libraries/plugins/es_objects/es_objects.cpp +++ b/libraries/plugins/es_objects/es_objects.cpp @@ -47,13 +47,14 @@ class es_objects_plugin_impl { curl = curl_easy_init(); } virtual ~es_objects_plugin_impl(); - bool updateDatabase( const vector& ids , bool isNew); + bool index_database( const vector& ids, std::string action); + void remove_from_database( object_id_type id, std::string index); es_objects_plugin& _self; std::string _es_objects_elasticsearch_url = "http://localhost:9200/"; std::string _es_objects_auth = ""; - uint32_t _es_objects_bulk_replay = 5000; - uint32_t _es_objects_bulk_sync = 10; + uint32_t _es_objects_bulk_replay = 10000; + uint32_t _es_objects_bulk_sync = 100; bool _es_objects_proposals = true; bool _es_objects_accounts = true; bool _es_objects_assets = true; @@ -64,27 +65,27 @@ class es_objects_plugin_impl CURL *curl; // curl handler vector bulk; vector prepare; - map bitassets; - map accounts; - map proposals; - map assets; - map balances; - map limit_orders; + + bool _es_objects_keep_only_current = true; + + uint32_t block_number; + fc::time_point_sec block_time; + private: - void PrepareProposal(const proposal_object& proposal_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareAccount(const account_object& account_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareAsset(const asset_object& asset_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareBalance(const balance_object& balance_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareLimit(const limit_order_object& limit_object, const fc::time_point_sec& block_time, const uint32_t& block_number); - void PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, const fc::time_point_sec& block_time, const uint32_t& block_number); + void prepare_proposal(const proposal_object& proposal_object); + void prepare_account(const account_object& account_object); + void prepare_asset(const asset_object& asset_object); + void prepare_balance(const account_balance_object& account_balance_object); + void prepare_limit(const limit_order_object& limit_object); + void prepare_bitasset(const asset_bitasset_data_object& bitasset_object); }; -bool es_objects_plugin_impl::updateDatabase( const vector& ids , bool isNew) +bool es_objects_plugin_impl::index_database( const vector& ids, std::string action) { graphene::chain::database &db = _self.database(); - const fc::time_point_sec block_time = db.head_block_time(); - const uint32_t block_number = db.head_block_num(); + block_time = db.head_block_time(); + block_number = db.head_block_num(); // check if we are in replay or in sync and change number of bulk documents accordingly uint32_t limit_documents = 0; @@ -97,38 +98,62 @@ bool es_objects_plugin_impl::updateDatabase( const vector& ids , if(value.is() && _es_objects_proposals) { auto obj = db.find_object(value); auto p = static_cast(obj); - if(p != nullptr) - PrepareProposal(*p, block_time, block_number); + if(p != nullptr) { + if(action == "delete") + remove_from_database(p->id, "proposal"); + else + prepare_proposal(*p); + } } else if(value.is() && _es_objects_accounts) { auto obj = db.find_object(value); auto a = static_cast(obj); - if(a != nullptr) - PrepareAccount(*a, block_time, block_number); + if(a != nullptr) { + if(action == "delete") + remove_from_database(a->id, "account"); + else + prepare_account(*a); + } } else if(value.is() && _es_objects_assets) { auto obj = db.find_object(value); auto a = static_cast(obj); - if(a != nullptr) - PrepareAsset(*a, block_time, block_number); + if(a != nullptr) { + if(action == "delete") + remove_from_database(a->id, "asset"); + else + prepare_asset(*a); + } } - else if(value.is() && _es_objects_balances) { + else if(value.is() && _es_objects_balances) { auto obj = db.find_object(value); - auto b = static_cast(obj); - if(b != nullptr) - PrepareBalance(*b, block_time, block_number); + auto b = static_cast(obj); + if(b != nullptr) { + if(action == "delete") + remove_from_database(b->id, "balance"); + else + prepare_balance(*b); + } } else if(value.is() && _es_objects_limit_orders) { auto obj = db.find_object(value); auto l = static_cast(obj); - if(l != nullptr) - PrepareLimit(*l, block_time, block_number); + if(l != nullptr) { + if(action == "delete") + remove_from_database(l->id, "limitorder"); + else + prepare_limit(*l); + } } else if(value.is() && _es_objects_asset_bitasset) { auto obj = db.find_object(value); auto ba = static_cast(obj); - if(ba != nullptr) - PrepareBitAsset(*ba, block_time, block_number); + if(ba != nullptr) { + if(action == "delete") + remove_from_database(ba->id, "bitasset"); + else + prepare_bitasset(*ba); + } } } @@ -149,8 +174,23 @@ bool es_objects_plugin_impl::updateDatabase( const vector& ids , return true; } -void es_objects_plugin_impl::PrepareProposal(const proposal_object& proposal_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::remove_from_database( object_id_type id, std::string index) +{ + if(_es_objects_keep_only_current) + { + fc::mutable_variant_object delete_line; + delete_line["_id"] = string(id); + delete_line["_index"] = _es_objects_index_prefix + index; + delete_line["_type"] = "data"; + fc::mutable_variant_object final_delete_line; + final_delete_line["delete"] = delete_line; + prepare.push_back(fc::json::to_string(final_delete_line)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); + prepare.clear(); + } +} + +void es_objects_plugin_impl::prepare_proposal(const proposal_object& proposal_object) { proposal_struct prop; prop.object_id = proposal_object.id; @@ -165,27 +205,22 @@ void es_objects_plugin_impl::PrepareProposal(const proposal_object& proposal_obj prop.available_key_approvals = fc::json::to_string(proposal_object.available_key_approvals); prop.proposer = proposal_object.proposer; - auto it = proposals.find(proposal_object.id); - if(it == proposals.end()) - proposals[proposal_object.id] = prop; - else { - if(it->second == prop) return; - else proposals[proposal_object.id] = prop; - } - std::string data = fc::json::to_string(prop); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "proposal"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(prop.object_id); + } - prepare = graphene::utilities::createBulk(bulk_header, data); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareAccount(const account_object& account_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_account(const account_object& account_object) { account_struct acct; acct.object_id = account_object.id; @@ -206,28 +241,24 @@ void es_objects_plugin_impl::PrepareAccount(const account_object& account_object acct.active_key_auths = fc::json::to_string(account_object.active.key_auths); acct.active_address_auths = fc::json::to_string(account_object.active.address_auths); acct.voting_account = account_object.options.voting_account; - - auto it = accounts.find(account_object.id); - if(it == accounts.end()) - accounts[account_object.id] = acct; - else { - if(it->second == acct) return; - else accounts[account_object.id] = acct; - } + acct.votes = fc::json::to_string(account_object.options.votes); std::string data = fc::json::to_string(acct); fc::mutable_variant_object bulk_header; - bulk_header["_index"] = _es_objects_index_prefix + "acount"; + bulk_header["_index"] = _es_objects_index_prefix + "account"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(acct.object_id); + } - prepare = graphene::utilities::createBulk(bulk_header, data); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareAsset(const asset_object& asset_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_asset(const asset_object& asset_object) { asset_struct asset; asset.object_id = asset_object.id; @@ -239,56 +270,47 @@ void es_objects_plugin_impl::PrepareAsset(const asset_object& asset_object, asset.dynamic_asset_data_id = asset_object.dynamic_asset_data_id; asset.bitasset_data_id = asset_object.bitasset_data_id; - auto it = assets.find(asset_object.id); - if(it == assets.end()) - assets[asset_object.id] = asset; - else { - if(it->second == asset) return; - else assets[asset_object.id] = asset; - } - std::string data = fc::json::to_string(asset); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "asset"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(asset.object_id); + } - prepare = graphene::utilities::createBulk(bulk_header, data); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareBalance(const balance_object& balance_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_balance(const account_balance_object& account_balance_object) { balance_struct balance; - balance.object_id = balance_object.id; + balance.object_id = account_balance_object.id; balance.block_time = block_time; - balance.block_number = block_number;balance.owner = balance_object.owner; - balance.asset_id = balance_object.balance.asset_id; - balance.amount = balance_object.balance.amount; - - auto it = balances.find(balance_object.id); - if(it == balances.end()) - balances[balance_object.id] = balance; - else { - if(it->second == balance) return; - else balances[balance_object.id] = balance; - } + balance.block_number = block_number; + balance.owner = account_balance_object.owner; + balance.asset_type = account_balance_object.asset_type; + balance.balance = account_balance_object.balance; std::string data = fc::json::to_string(balance); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "balance"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(balance.object_id); + } - prepare = graphene::utilities::createBulk(bulk_header, data); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareLimit(const limit_order_object& limit_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_limit(const limit_order_object& limit_object) { limit_order_struct limit; limit.object_id = limit_object.id; @@ -300,27 +322,22 @@ void es_objects_plugin_impl::PrepareLimit(const limit_order_object& limit_object limit.sell_price = limit_object.sell_price; limit.deferred_fee = limit_object.deferred_fee; - auto it = limit_orders.find(limit_object.id); - if(it == limit_orders.end()) - limit_orders[limit_object.id] = limit; - else { - if(it->second == limit) return; - else limit_orders[limit_object.id] = limit; - } - std::string data = fc::json::to_string(limit); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "limitorder"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(limit.object_id); + } - prepare = graphene::utilities::createBulk(bulk_header, data); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } -void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, - const fc::time_point_sec& block_time, const uint32_t& block_number) +void es_objects_plugin_impl::prepare_bitasset(const asset_bitasset_data_object& bitasset_object) { if(!bitasset_object.is_prediction_market) { @@ -331,22 +348,18 @@ void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object& b bitasset.current_feed = fc::json::to_string(bitasset_object.current_feed); bitasset.current_feed_publication_time = bitasset_object.current_feed_publication_time; - auto it = bitassets.find(bitasset_object.id); - if(it == bitassets.end()) - bitassets[bitasset_object.id] = bitasset; - else { - if(it->second == bitasset) return; - else bitassets[bitasset_object.id] = bitasset; - } - std::string data = fc::json::to_string(bitasset); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "bitasset"; bulk_header["_type"] = "data"; + if(_es_objects_keep_only_current) + { + bulk_header["_id"] = string(bitasset.object_id); + } - prepare = graphene::utilities::createBulk(bulk_header, data); - bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare = graphene::utilities::createBulk(bulk_header, std::move(data)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk)); prepare.clear(); } } @@ -382,17 +395,18 @@ void es_objects_plugin::plugin_set_program_options( ) { cli.add_options() - ("es-objects-elasticsearch-url", boost::program_options::value(), "Elasticsearch node url") - ("es-objects-auth", boost::program_options::value(), "Basic auth username:password") - ("es-objects-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(5000)") - ("es-objects-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a syncronied chain(10)") - ("es-objects-proposals", boost::program_options::value(), "Store proposal objects") - ("es-objects-accounts", boost::program_options::value(), "Store account objects") - ("es-objects-assets", boost::program_options::value(), "Store asset objects") - ("es-objects-balances", boost::program_options::value(), "Store balances objects") - ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects") - ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data") + ("es-objects-elasticsearch-url", boost::program_options::value(), "Elasticsearch node url(http://localhost:9200/)") + ("es-objects-auth", boost::program_options::value(), "Basic auth username:password('')") + ("es-objects-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(10000)") + ("es-objects-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a synchronized chain(100)") + ("es-objects-proposals", boost::program_options::value(), "Store proposal objects(true)") + ("es-objects-accounts", boost::program_options::value(), "Store account objects(true)") + ("es-objects-assets", boost::program_options::value(), "Store asset objects(true)") + ("es-objects-balances", boost::program_options::value(), "Store balances objects(true)") + ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects(true)") + ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data(true)") ("es-objects-index-prefix", boost::program_options::value(), "Add a prefix to the index(objects-)") + ("es-objects-keep-only-current", boost::program_options::value(), "Keep only current state of the objects(true)") ; cfg.add(cli); } @@ -400,17 +414,25 @@ void es_objects_plugin::plugin_set_program_options( void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options) { database().new_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ) { - if(!my->updateDatabase(ids, 1)) + if(!my->index_database(ids, "create")) { - FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); + FC_THROW_EXCEPTION(fc::exception, "Error creating object from ES database, we are going to keep trying."); } }); database().changed_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ) { - if(!my->updateDatabase(ids, 0)) + if(!my->index_database(ids, "update")) { - FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); + FC_THROW_EXCEPTION(fc::exception, "Error updating object from ES database, we are going to keep trying."); } }); + database().removed_objects.connect([this](const vector& ids, const vector& objs, const flat_set& impacted_accounts) { + if(!my->index_database(ids, "delete")) + { + FC_THROW_EXCEPTION(fc::exception, "Error deleting object from ES database, we are going to keep trying."); + } + }); + + if (options.count("es-objects-elasticsearch-url")) { my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); } @@ -444,6 +466,9 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable if (options.count("es-objects-index-prefix")) { my->_es_objects_index_prefix = options["es-objects-index-prefix"].as(); } + if (options.count("es-objects-keep-only-current")) { + my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as(); + } } void es_objects_plugin::plugin_startup() diff --git a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp index d9c38711..886129c8 100644 --- a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp +++ b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp @@ -102,6 +102,7 @@ struct account_struct { string active_key_auths; string active_address_auths; account_id_type voting_account; + string votes; friend bool operator==(const account_struct& l, const account_struct& r) { @@ -109,11 +110,11 @@ struct account_struct { l.lifetime_referrer, l.network_fee_percentage, l.lifetime_referrer_fee_percentage, l.referrer_rewards_percentage, l.name, l.owner_account_auths, l.owner_key_auths, l.owner_address_auths, l.active_account_auths, l.active_key_auths, l.active_address_auths, - l.voting_account) == std::tie(r.object_id, r.block_time, r.block_number, r.membership_expiration_date, r.registrar, r.referrer, + l.voting_account, l.votes) == std::tie(r.object_id, r.block_time, r.block_number, r.membership_expiration_date, r.registrar, r.referrer, r.lifetime_referrer, r.network_fee_percentage, r.lifetime_referrer_fee_percentage, r.referrer_rewards_percentage, r.name, r.owner_account_auths, r.owner_key_auths, r.owner_address_auths, r.active_account_auths, r.active_key_auths, r.active_address_auths, - r.voting_account); + r.voting_account, r.votes); } friend bool operator!=(const account_struct& l, const account_struct& r) { @@ -146,14 +147,14 @@ struct balance_struct { object_id_type object_id; fc::time_point_sec block_time; uint32_t block_number; - address owner; - asset_id_type asset_id; - share_type amount; + account_id_type owner; + asset_id_type asset_type; + share_type balance; friend bool operator==(const balance_struct& l, const balance_struct& r) { - return std::tie(l.object_id, l.block_time, l.block_number, l.block_time, l.owner, l.asset_id, l.amount) - == std::tie(r.object_id, r.block_time, r.block_number, r.block_time, r.owner, r.asset_id, r.amount); + return std::tie(l.object_id, l.block_time, l.block_number, l.block_time, l.owner, l.asset_type, l.balance) + == std::tie(r.object_id, r.block_time, r.block_number, r.block_time, r.owner, r.asset_type, r.balance); } friend bool operator!=(const balance_struct& l, const balance_struct& r) { @@ -201,9 +202,30 @@ struct bitasset_struct { } } //graphene::es_objects -FC_REFLECT( graphene::es_objects::proposal_struct, (object_id)(block_time)(block_number)(expiration_time)(review_period_time)(proposed_transaction)(required_active_approvals)(available_active_approvals)(required_owner_approvals)(available_owner_approvals)(available_key_approvals)(proposer) ) -FC_REFLECT( graphene::es_objects::account_struct, (object_id)(block_time)(block_number)(membership_expiration_date)(registrar)(referrer)(lifetime_referrer)(network_fee_percentage)(lifetime_referrer_fee_percentage)(referrer_rewards_percentage)(name)(owner_account_auths)(owner_key_auths)(owner_address_auths)(active_account_auths)(active_key_auths)(active_address_auths)(voting_account) ) -FC_REFLECT( graphene::es_objects::asset_struct, (object_id)(block_time)(block_number)(symbol)(issuer)(is_market_issued)(dynamic_asset_data_id)(bitasset_data_id) ) -FC_REFLECT( graphene::es_objects::balance_struct, (object_id)(block_time)(block_number)(block_time)(owner)(asset_id)(amount) ) -FC_REFLECT( graphene::es_objects::limit_order_struct, (object_id)(block_time)(block_number)(expiration)(seller)(for_sale)(sell_price)(deferred_fee) ) -FC_REFLECT( graphene::es_objects::bitasset_struct, (object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time) ) +FC_REFLECT( + graphene::es_objects::proposal_struct, + (object_id)(block_time)(block_number)(expiration_time)(review_period_time)(proposed_transaction)(required_active_approvals) + (available_active_approvals)(required_owner_approvals)(available_owner_approvals)(available_key_approvals)(proposer) +) +FC_REFLECT( + graphene::es_objects::account_struct, + (object_id)(block_time)(block_number)(membership_expiration_date)(registrar)(referrer)(lifetime_referrer) + (network_fee_percentage)(lifetime_referrer_fee_percentage)(referrer_rewards_percentage)(name)(owner_account_auths) + (owner_key_auths)(owner_address_auths)(active_account_auths)(active_key_auths)(active_address_auths)(voting_account)(votes) +) +FC_REFLECT( + graphene::es_objects::asset_struct, + (object_id)(block_time)(block_number)(symbol)(issuer)(is_market_issued)(dynamic_asset_data_id)(bitasset_data_id) +) +FC_REFLECT( + graphene::es_objects::balance_struct, + (object_id)(block_time)(block_number)(owner)(asset_type)(balance) +) +FC_REFLECT( + graphene::es_objects::limit_order_struct, + (object_id)(block_time)(block_number)(expiration)(seller)(for_sale)(sell_price)(deferred_fee) +) +FC_REFLECT( + graphene::es_objects::bitasset_struct, + (object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time) +)