/* * Copyright (c) 2018 oxarbitrage, and contributors. * * The MIT License * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ #include #include #include #include #include #include #include namespace graphene { namespace es_objects { namespace detail { class es_objects_plugin_impl { public: es_objects_plugin_impl(es_objects_plugin& _plugin) : _self( _plugin ) { curl = curl_easy_init(); } virtual ~es_objects_plugin_impl(); bool updateDatabase( const vector& ids , bool isNew); es_objects_plugin& _self; std::string _es_objects_elasticsearch_url = "http://localhost:9200/"; std::string _es_objects_auth = ""; uint32_t _es_objects_bulk_replay = 5000; uint32_t _es_objects_bulk_sync = 10; bool _es_objects_proposals = true; bool _es_objects_accounts = true; bool _es_objects_assets = true; bool _es_objects_balances = true; bool _es_objects_limit_orders = true; bool _es_objects_asset_bitasset = true; std::string _es_objects_index_prefix = "objects-"; CURL *curl; // curl handler vector bulk; vector prepare; map bitassets; map accounts; map proposals; map assets; map balances; map limit_orders; private: void PrepareProposal(const proposal_object& proposal_object, const fc::time_point_sec& block_time, const uint32_t& block_number); void PrepareAccount(const account_object& account_object, const fc::time_point_sec& block_time, const uint32_t& block_number); void PrepareAsset(const asset_object& asset_object, const fc::time_point_sec& block_time, const uint32_t& block_number); void PrepareBalance(const balance_object& balance_object, const fc::time_point_sec& block_time, const uint32_t& block_number); void PrepareLimit(const limit_order_object& limit_object, const fc::time_point_sec& block_time, const uint32_t& block_number); void PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, const fc::time_point_sec& block_time, const uint32_t& block_number); }; bool es_objects_plugin_impl::updateDatabase( const vector& ids , bool isNew) { graphene::chain::database &db = _self.database(); const fc::time_point_sec block_time = db.head_block_time(); const uint32_t block_number = db.head_block_num(); // check if we are in replay or in sync and change number of bulk documents accordingly uint32_t limit_documents = 0; if((fc::time_point::now() - block_time) < fc::seconds(30)) limit_documents = _es_objects_bulk_sync; else limit_documents = _es_objects_bulk_replay; for(auto const& value: ids) { if(value.is() && _es_objects_proposals) { auto obj = db.find_object(value); auto p = static_cast(obj); if(p != nullptr) PrepareProposal(*p, block_time, block_number); } else if(value.is() && _es_objects_accounts) { auto obj = db.find_object(value); auto a = static_cast(obj); if(a != nullptr) PrepareAccount(*a, block_time, block_number); } else if(value.is() && _es_objects_assets) { auto obj = db.find_object(value); auto a = static_cast(obj); if(a != nullptr) PrepareAsset(*a, block_time, block_number); } else if(value.is() && _es_objects_balances) { auto obj = db.find_object(value); auto b = static_cast(obj); if(b != nullptr) PrepareBalance(*b, block_time, block_number); } else if(value.is() && _es_objects_limit_orders) { auto obj = db.find_object(value); auto l = static_cast(obj); if(l != nullptr) PrepareLimit(*l, block_time, block_number); } else if(value.is() && _es_objects_asset_bitasset) { auto obj = db.find_object(value); auto ba = static_cast(obj); if(ba != nullptr) PrepareBitAsset(*ba, block_time, block_number); } } if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech graphene::utilities::ES es; es.curl = curl; es.bulk_lines = bulk; es.elasticsearch_url = _es_objects_elasticsearch_url; es.auth = _es_objects_auth; if(!graphene::utilities::SendBulk(es)) return false; else bulk.clear(); } return true; } void es_objects_plugin_impl::PrepareProposal(const proposal_object& proposal_object, const fc::time_point_sec& block_time, const uint32_t& block_number) { proposal_struct prop; prop.object_id = proposal_object.id; prop.block_time = block_time; prop.block_number = block_number; prop.expiration_time = proposal_object.expiration_time; prop.review_period_time = proposal_object.review_period_time; prop.proposed_transaction = fc::json::to_string(proposal_object.proposed_transaction); prop.required_owner_approvals = fc::json::to_string(proposal_object.required_owner_approvals); prop.available_owner_approvals = fc::json::to_string(proposal_object.available_owner_approvals); prop.required_active_approvals = fc::json::to_string(proposal_object.required_active_approvals); prop.available_key_approvals = fc::json::to_string(proposal_object.available_key_approvals); prop.proposer = proposal_object.proposer; auto it = proposals.find(proposal_object.id); if(it == proposals.end()) proposals[proposal_object.id] = prop; else { if(it->second == prop) return; else proposals[proposal_object.id] = prop; } std::string data = fc::json::to_string(prop); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "proposal"; bulk_header["_type"] = "data"; prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } void es_objects_plugin_impl::PrepareAccount(const account_object& account_object, const fc::time_point_sec& block_time, const uint32_t& block_number) { account_struct acct; acct.object_id = account_object.id; acct.block_time = block_time; acct.block_number = block_number; acct.membership_expiration_date = account_object.membership_expiration_date; acct.registrar = account_object.registrar; acct.referrer = account_object.referrer; acct.lifetime_referrer = account_object.lifetime_referrer; acct.network_fee_percentage = account_object.network_fee_percentage; acct.lifetime_referrer_fee_percentage = account_object.lifetime_referrer_fee_percentage; acct.referrer_rewards_percentage = account_object.referrer_rewards_percentage; acct.name = account_object.name; acct.owner_account_auths = fc::json::to_string(account_object.owner.account_auths); acct.owner_key_auths = fc::json::to_string(account_object.owner.key_auths); acct.owner_address_auths = fc::json::to_string(account_object.owner.address_auths); acct.active_account_auths = fc::json::to_string(account_object.active.account_auths); acct.active_key_auths = fc::json::to_string(account_object.active.key_auths); acct.active_address_auths = fc::json::to_string(account_object.active.address_auths); acct.voting_account = account_object.options.voting_account; auto it = accounts.find(account_object.id); if(it == accounts.end()) accounts[account_object.id] = acct; else { if(it->second == acct) return; else accounts[account_object.id] = acct; } std::string data = fc::json::to_string(acct); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "acount"; bulk_header["_type"] = "data"; prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } void es_objects_plugin_impl::PrepareAsset(const asset_object& asset_object, const fc::time_point_sec& block_time, const uint32_t& block_number) { asset_struct asset; asset.object_id = asset_object.id; asset.block_time = block_time; asset.block_number = block_number; asset.symbol = asset_object.symbol; asset.issuer = asset_object.issuer; asset.is_market_issued = asset_object.is_market_issued(); asset.dynamic_asset_data_id = asset_object.dynamic_asset_data_id; asset.bitasset_data_id = asset_object.bitasset_data_id; auto it = assets.find(asset_object.id); if(it == assets.end()) assets[asset_object.id] = asset; else { if(it->second == asset) return; else assets[asset_object.id] = asset; } std::string data = fc::json::to_string(asset); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "asset"; bulk_header["_type"] = "data"; prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } void es_objects_plugin_impl::PrepareBalance(const balance_object& balance_object, const fc::time_point_sec& block_time, const uint32_t& block_number) { balance_struct balance; balance.object_id = balance_object.id; balance.block_time = block_time; balance.block_number = block_number;balance.owner = balance_object.owner; balance.asset_id = balance_object.balance.asset_id; balance.amount = balance_object.balance.amount; auto it = balances.find(balance_object.id); if(it == balances.end()) balances[balance_object.id] = balance; else { if(it->second == balance) return; else balances[balance_object.id] = balance; } std::string data = fc::json::to_string(balance); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "balance"; bulk_header["_type"] = "data"; prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } void es_objects_plugin_impl::PrepareLimit(const limit_order_object& limit_object, const fc::time_point_sec& block_time, const uint32_t& block_number) { limit_order_struct limit; limit.object_id = limit_object.id; limit.block_time = block_time; limit.block_number = block_number; limit.expiration = limit_object.expiration; limit.seller = limit_object.seller; limit.for_sale = limit_object.for_sale; limit.sell_price = limit_object.sell_price; limit.deferred_fee = limit_object.deferred_fee; auto it = limit_orders.find(limit_object.id); if(it == limit_orders.end()) limit_orders[limit_object.id] = limit; else { if(it->second == limit) return; else limit_orders[limit_object.id] = limit; } std::string data = fc::json::to_string(limit); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "limitorder"; bulk_header["_type"] = "data"; prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, const fc::time_point_sec& block_time, const uint32_t& block_number) { if(!bitasset_object.is_prediction_market) { bitasset_struct bitasset; bitasset.object_id = bitasset_object.id; bitasset.block_time = block_time; bitasset.block_number = block_number; bitasset.current_feed = fc::json::to_string(bitasset_object.current_feed); bitasset.current_feed_publication_time = bitasset_object.current_feed_publication_time; auto it = bitassets.find(bitasset_object.id); if(it == bitassets.end()) bitassets[bitasset_object.id] = bitasset; else { if(it->second == bitasset) return; else bitassets[bitasset_object.id] = bitasset; } std::string data = fc::json::to_string(bitasset); fc::mutable_variant_object bulk_header; bulk_header["_index"] = _es_objects_index_prefix + "bitasset"; bulk_header["_type"] = "data"; prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } } es_objects_plugin_impl::~es_objects_plugin_impl() { return; } } // end namespace detail es_objects_plugin::es_objects_plugin() : my( new detail::es_objects_plugin_impl(*this) ) { } es_objects_plugin::~es_objects_plugin() { } std::string es_objects_plugin::plugin_name()const { return "es_objects"; } std::string es_objects_plugin::plugin_description()const { return "Stores blockchain objects in ES database. Experimental."; } void es_objects_plugin::plugin_set_program_options( boost::program_options::options_description& cli, boost::program_options::options_description& cfg ) { cli.add_options() ("es-objects-elasticsearch-url", boost::program_options::value(), "Elasticsearch node url") ("es-objects-auth", boost::program_options::value(), "Basic auth username:password") ("es-objects-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(5000)") ("es-objects-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a syncronied chain(10)") ("es-objects-proposals", boost::program_options::value(), "Store proposal objects") ("es-objects-accounts", boost::program_options::value(), "Store account objects") ("es-objects-assets", boost::program_options::value(), "Store asset objects") ("es-objects-balances", boost::program_options::value(), "Store balances objects") ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects") ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data") ("es-objects-index-prefix", boost::program_options::value(), "Add a prefix to the index(objects-)") ; cfg.add(cli); } void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options) { database().new_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ) { if(!my->updateDatabase(ids, 1)) { FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); } }); database().changed_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ) { if(!my->updateDatabase(ids, 0)) { FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); } }); if (options.count("es-objects-elasticsearch-url")) { my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); } if (options.count("es-objects-auth")) { my->_es_objects_auth = options["es-objects-auth"].as(); } if (options.count("es-objects-bulk-replay")) { my->_es_objects_bulk_replay = options["es-objects-bulk-replay"].as(); } if (options.count("es-objects-bulk-sync")) { my->_es_objects_bulk_sync = options["es-objects-bulk-sync"].as(); } if (options.count("es-objects-proposals")) { my->_es_objects_proposals = options["es-objects-proposals"].as(); } if (options.count("es-objects-accounts")) { my->_es_objects_accounts = options["es-objects-accounts"].as(); } if (options.count("es-objects-assets")) { my->_es_objects_assets = options["es-objects-assets"].as(); } if (options.count("es-objects-balances")) { my->_es_objects_balances = options["es-objects-balances"].as(); } if (options.count("es-objects-limit-orders")) { my->_es_objects_limit_orders = options["es-objects-limit-orders"].as(); } if (options.count("es-objects-asset-bitasset")) { my->_es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as(); } if (options.count("es-objects-index-prefix")) { my->_es_objects_index_prefix = options["es-objects-index-prefix"].as(); } } void es_objects_plugin::plugin_startup() { graphene::utilities::ES es; es.curl = my->curl; es.elasticsearch_url = my->_es_objects_elasticsearch_url; es.auth = my->_es_objects_auth; es.auth = my->_es_objects_index_prefix; if(!graphene::utilities::checkES(es)) FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_es_objects_elasticsearch_url)); ilog("elasticsearch OBJECTS: plugin_startup() begin"); } } }