/* * Copyright (c) 2017 Cryptonomex, Inc., and contributors. * * The MIT License * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ #include #include #include #include #include namespace graphene { namespace elasticsearch { namespace detail { class elasticsearch_plugin_impl { public: elasticsearch_plugin_impl(elasticsearch_plugin& _plugin) : _self( _plugin ) { curl = curl_easy_init(); } virtual ~elasticsearch_plugin_impl(); bool update_account_histories( const signed_block& b ); graphene::chain::database& database() { return _self.database(); } elasticsearch_plugin& _self; primary_index< operation_history_index >* _oho_index; std::string _elasticsearch_node_url = "http://localhost:9200/"; uint32_t _elasticsearch_bulk_replay = 10000; uint32_t _elasticsearch_bulk_sync = 100; bool _elasticsearch_visitor = false; std::string _elasticsearch_basic_auth = ""; std::string _elasticsearch_index_prefix = "peerplays-"; bool _elasticsearch_operation_object = false; uint32_t _elasticsearch_start_es_after_block = 0; bool _elasticsearch_operation_string = true; mode _elasticsearch_mode = mode::only_save; CURL *curl; // curl handler vector bulk_lines; // vector of op lines vector prepare; graphene::utilities::ES es; uint32_t limit_documents; int16_t op_type; operation_history_struct os; block_struct bs; visitor_struct vs; bulk_struct bulk_line_struct; std::string bulk_line; std::string index_name; bool is_sync = false; fc::time_point last_sync; private: bool add_elasticsearch( const account_id_type account_id, const optional& oho, const uint32_t block_number ); const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj, const account_id_type account_id, const optional & oho); const account_statistics_object& getStatsObject(const account_id_type account_id); void growStats(const account_statistics_object& stats_obj, const account_transaction_history_object& ath); void getOperationType(const optional & oho); void doOperationHistory(const optional & oho); void doBlock(const optional & oho, const signed_block& b); void doVisitor(const optional & oho); void checkState(const fc::time_point_sec& block_time); void cleanObjects(const account_transaction_history_object& ath, account_id_type account_id); void createBulkLine(const account_transaction_history_object& ath); void prepareBulk(const account_transaction_history_id_type& ath_id); void populateESstruct(); }; elasticsearch_plugin_impl::~elasticsearch_plugin_impl() { if (curl) { curl_easy_cleanup(curl); curl = nullptr; } return; } bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b ) { checkState(b.timestamp); index_name = graphene::utilities::generateIndexName(b.timestamp, _elasticsearch_index_prefix); graphene::chain::database& db = database(); const vector >& hist = db.get_applied_operations(); bool is_first = true; auto skip_oho_id = [&is_first,&db,this]() { if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo { db.remove( db.create( []( operation_history_object& obj) {} ) ); is_first = false; } else _oho_index->use_next_id(); }; for( const optional< operation_history_object >& o_op : hist ) { optional oho; auto create_oho = [&]() { is_first = false; return optional( db.create([&](operation_history_object &h) { if (o_op.valid()) { h.op = o_op->op; h.result = o_op->result; h.block_num = o_op->block_num; h.trx_in_block = o_op->trx_in_block; h.op_in_trx = o_op->op_in_trx; h.virtual_op = o_op->virtual_op; } })); }; if( !o_op.valid() ) { skip_oho_id(); continue; } oho = create_oho(); // populate what we can before impacted loop getOperationType(oho); doOperationHistory(oho); doBlock(oho, b); if(_elasticsearch_visitor) doVisitor(oho); const operation_history_object& op = *o_op; // get the set of accounts this operation applies to flat_set impacted; vector other; // fee_payer is added here operation_get_required_authorities( op.op, impacted, impacted, other, MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) ); if( op.op.which() == operation::tag< account_create_operation >::value ) impacted.insert( op.result.get() ); else operation_get_impacted_accounts( op.op, impacted, MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) ); for( auto& a : other ) for( auto& item : a.account_auths ) impacted.insert( item.first ); for( auto& account_id : impacted ) { if(!add_elasticsearch( account_id, oho, b.block_num() )) return false; } } // we send bulk at end of block when we are in sync for better real time client experience if(is_sync) { populateESstruct(); if(es.bulk_lines.size() > 0) { prepare.clear(); if(!graphene::utilities::SendBulk(es)) return false; else bulk_lines.clear(); } } return true; } void elasticsearch_plugin_impl::checkState(const fc::time_point_sec& block_time) { fc::time_point current_time(fc::time_point::now()); if(((current_time - block_time) < fc::seconds(30)) || (current_time - last_sync > fc::seconds(60))) { limit_documents = _elasticsearch_bulk_sync; is_sync = true; last_sync = current_time; } else { limit_documents = _elasticsearch_bulk_replay; is_sync = false; } } void elasticsearch_plugin_impl::getOperationType(const optional & oho) { if (!oho->id.is_null()) op_type = oho->op.which(); } void elasticsearch_plugin_impl::doOperationHistory(const optional & oho) { os.trx_in_block = oho->trx_in_block; os.op_in_trx = oho->op_in_trx; os.operation_result = fc::json::to_string(oho->result); os.virtual_op = oho->virtual_op; if(_elasticsearch_operation_object) { oho->op.visit(fc::from_static_variant(os.op_object, FC_PACK_MAX_DEPTH)); adaptor_struct adaptor; os.op_object = adaptor.adapt(os.op_object.get_object()); } if(_elasticsearch_operation_string) os.op = fc::json::to_string(oho->op); } void elasticsearch_plugin_impl::doBlock(const optional & oho, const signed_block& b) { std::string trx_id = ""; if(oho->trx_in_block < b.transactions.size()) trx_id = b.transactions[oho->trx_in_block].id().str(); bs.block_num = b.block_num(); bs.block_time = b.timestamp; bs.trx_id = trx_id; } void elasticsearch_plugin_impl::doVisitor(const optional & oho) { operation_visitor o_v; oho->op.visit(o_v); vs.fee_data.asset = o_v.fee_asset; vs.fee_data.amount = o_v.fee_amount; vs.transfer_data.asset = o_v.transfer_asset_id; vs.transfer_data.amount = o_v.transfer_amount; vs.transfer_data.from = o_v.transfer_from; vs.transfer_data.to = o_v.transfer_to; vs.fill_data.order_id = o_v.fill_order_id; vs.fill_data.account_id = o_v.fill_account_id; vs.fill_data.pays_asset_id = o_v.fill_pays_asset_id; vs.fill_data.pays_amount = o_v.fill_pays_amount; vs.fill_data.receives_asset_id = o_v.fill_receives_asset_id; vs.fill_data.receives_amount = o_v.fill_receives_amount; //vs.fill_data.fill_price = o_v.fill_fill_price; //vs.fill_data.is_maker = o_v.fill_is_maker; } bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account_id, const optional & oho, const uint32_t block_number) { const auto &stats_obj = getStatsObject(account_id); const auto &ath = addNewEntry(stats_obj, account_id, oho); growStats(stats_obj, ath); if(block_number > _elasticsearch_start_es_after_block) { createBulkLine(ath); prepareBulk(ath.id); } cleanObjects(ath, account_id); if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech prepare.clear(); populateESstruct(); if(!graphene::utilities::SendBulk(es)) return false; else bulk_lines.clear(); } return true; } const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const account_id_type account_id) { graphene::chain::database& db = database(); const auto &acct = db.get(account_id); return acct.statistics(db); } const account_transaction_history_object& elasticsearch_plugin_impl::addNewEntry(const account_statistics_object& stats_obj, const account_id_type account_id, const optional & oho) { graphene::chain::database& db = database(); const auto &ath = db.create([&](account_transaction_history_object &obj) { obj.operation_id = oho->id; obj.account = account_id; obj.sequence = stats_obj.total_ops + 1; obj.next = stats_obj.most_recent_op; }); return ath; } void elasticsearch_plugin_impl::growStats(const account_statistics_object& stats_obj, const account_transaction_history_object& ath) { graphene::chain::database& db = database(); db.modify(stats_obj, [&](account_statistics_object &obj) { obj.most_recent_op = ath.id; obj.total_ops = ath.sequence; }); } void elasticsearch_plugin_impl::createBulkLine(const account_transaction_history_object& ath) { bulk_line_struct.account_history = ath; bulk_line_struct.operation_history = os; bulk_line_struct.operation_type = op_type; bulk_line_struct.operation_id_num = ath.operation_id.instance.value; bulk_line_struct.block_data = bs; if(_elasticsearch_visitor) bulk_line_struct.additional_data = vs; bulk_line = fc::json::to_string(bulk_line_struct); } void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id_type& ath_id) { const std::string _id = fc::json::to_string(ath_id); fc::mutable_variant_object bulk_header; bulk_header["_index"] = index_name; bulk_header["_type"] = "data"; bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "." + ath_id.instance; prepare = graphene::utilities::createBulk(bulk_header, bulk_line); bulk_lines.insert(bulk_lines.end(), prepare.begin(), prepare.end()); } void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_object& ath, account_id_type account_id) { graphene::chain::database& db = database(); // remove everything except current object from ath const auto &his_idx = db.get_index_type(); const auto &by_seq_idx = his_idx.indices().get(); auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0)); if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath.id) { // if found, remove the entry const auto remove_op_id = itr->operation_id; const auto itr_remove = itr; ++itr; db.remove( *itr_remove ); // modify previous node's next pointer // this should be always true, but just have a check here if( itr != by_seq_idx.end() && itr->account == account_id ) { db.modify( *itr, [&]( account_transaction_history_object& obj ){ obj.next = account_transaction_history_id_type(); }); } // do the same on oho const auto &by_opid_idx = his_idx.indices().get(); if (by_opid_idx.find(remove_op_id) == by_opid_idx.end()) { db.remove(remove_op_id(db)); } } } void elasticsearch_plugin_impl::populateESstruct() { es.curl = curl; es.bulk_lines = bulk_lines; es.elasticsearch_url = _elasticsearch_node_url; es.auth = _elasticsearch_basic_auth; } } // end namespace detail elasticsearch_plugin::elasticsearch_plugin() : my( new detail::elasticsearch_plugin_impl(*this) ) { } elasticsearch_plugin::~elasticsearch_plugin() { } std::string elasticsearch_plugin::plugin_name()const { return "elasticsearch"; } std::string elasticsearch_plugin::plugin_description()const { return "Stores account history data in elasticsearch database(EXPERIMENTAL)."; } void elasticsearch_plugin::plugin_set_program_options( boost::program_options::options_description& cli, boost::program_options::options_description& cfg ) { cli.add_options() ("elasticsearch-node-url", boost::program_options::value(), "Elastic Search database node url(http://localhost:9200/)") ("elasticsearch-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(10000)") ("elasticsearch-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a syncronied chain(100)") ("elasticsearch-visitor", boost::program_options::value(), "Use visitor to index additional data(slows down the replay(false))") ("elasticsearch-basic-auth", boost::program_options::value(), "Pass basic auth to elasticsearch database('')") ("elasticsearch-index-prefix", boost::program_options::value(), "Add a prefix to the index(peerplays-)") ("elasticsearch-operation-object", boost::program_options::value(), "Save operation as object(false)") ("elasticsearch-start-es-after-block", boost::program_options::value(), "Start doing ES job after block(0)") ("elasticsearch-operation-string", boost::program_options::value(), "Save operation as string. Needed to serve history api calls(true)") ("elasticsearch-mode", boost::program_options::value(), "Mode of operation: only_save(0), only_query(1), all(2) - Default: 0") ; cfg.add(cli); } void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options) { my->_oho_index = database().add_index< primary_index< operation_history_index > >(); database().add_index< primary_index< account_transaction_history_index > >(); if (options.count("elasticsearch-node-url")) { my->_elasticsearch_node_url = options["elasticsearch-node-url"].as(); } if (options.count("elasticsearch-bulk-replay")) { my->_elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as(); } if (options.count("elasticsearch-bulk-sync")) { my->_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as(); } if (options.count("elasticsearch-visitor")) { my->_elasticsearch_visitor = options["elasticsearch-visitor"].as(); } if (options.count("elasticsearch-basic-auth")) { my->_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as(); } if (options.count("elasticsearch-index-prefix")) { my->_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as(); } if (options.count("elasticsearch-operation-object")) { my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as(); } if (options.count("elasticsearch-start-es-after-block")) { my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as(); } if (options.count("elasticsearch-operation-string")) { my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as(); } if (options.count("elasticsearch-mode")) { const auto option_number = options["elasticsearch-mode"].as(); if(option_number > mode::all) FC_THROW_EXCEPTION(fc::exception, "Elasticsearch mode not valid"); my->_elasticsearch_mode = static_cast(options["elasticsearch-mode"].as()); } if(my->_elasticsearch_mode != mode::only_query) { if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string) FC_THROW_EXCEPTION(fc::exception, "If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true"); database().applied_block.connect([this](const signed_block &b) { if (!my->update_account_histories(b)) FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); }); } } void elasticsearch_plugin::plugin_startup() { graphene::utilities::ES es; es.curl = my->curl; es.elasticsearch_url = my->_elasticsearch_node_url; es.auth = my->_elasticsearch_basic_auth; if(!graphene::utilities::checkES(es)) FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_elasticsearch_node_url)); ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin"); } operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id) { const string operation_id_string = std::string(object_id_type(id)); const string query = R"( { "query": { "match": { "account_history.operation_id": )" + operation_id_string + R"(" } } } )"; auto es = prepareHistoryQuery(query); const auto response = graphene::utilities::simpleQuery(es); variant variant_response = fc::json::from_string(response); const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"]; return fromEStoOperation(source); } vector elasticsearch_plugin::get_account_history( const account_id_type account_id, operation_history_id_type stop = operation_history_id_type(), unsigned limit = 100, operation_history_id_type start = operation_history_id_type()) { const string account_id_string = std::string(object_id_type(account_id)); const auto stop_number = stop.instance.value; const auto start_number = start.instance.value; string range = ""; if(stop_number == 0) range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]"; else if(stop_number > 0) range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]"; const string query = R"( { "size": )" + fc::to_string(limit) + R"(, "sort" : [{ "operation_id_num" : {"order" : "desc"}}], "query": { "bool": { "must": [ { "query_string": { "query": "account_history.account: )" + account_id_string + range + R"(" } } ] } } } )"; auto es = prepareHistoryQuery(query); vector result; if(!graphene::utilities::checkES(es)) return result; const auto response = graphene::utilities::simpleQuery(es); variant variant_response = fc::json::from_string(response); const auto hits = variant_response["hits"]["total"]["value"]; uint32_t size; if( hits.is_object() ) // ES-7 ? size = static_cast(hits["value"].as_uint64()); else // probably ES-6 size = static_cast(hits.as_uint64()); size = std::min( size, limit ); for(unsigned i=0; i_elasticsearch_node_url; es.index_prefix = my->_elasticsearch_index_prefix; es.endpoint = es.index_prefix + "*/data/_search"; es.query = query; return es; } mode elasticsearch_plugin::get_running_mode() { return my->_elasticsearch_mode; } } }