From c9583f4486605ecb5152881d69ec6de1c833f834 Mon Sep 17 00:00:00 2001 From: Abit Date: Thu, 2 Aug 2018 15:31:54 +0000 Subject: [PATCH] Merge pull request #1201 from oxarbitrage/elasticsearch_tests2 Elasticsearch refactor --- .../elasticsearch/elasticsearch_plugin.cpp | 383 ++++++++++-------- .../elasticsearch/elasticsearch_plugin.hpp | 58 ++- libraries/plugins/es_objects/es_objects.cpp | 317 ++++++++++----- .../graphene/es_objects/es_objects.hpp | 74 +++- libraries/utilities/elasticsearch.cpp | 223 ++++++---- .../graphene/utilities/elasticsearch.hpp | 40 +- tests/CMakeLists.txt | 20 +- tests/common/database_fixture.cpp | 51 ++- tests/elasticsearch/main.cpp | 213 ++++++++++ 9 files changed, 985 insertions(+), 394 deletions(-) create mode 100644 tests/elasticsearch/main.cpp diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index a7d3fefa..b69ff64a 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -23,26 +23,11 @@ */ #include - #include - #include -#include -#include -#include -#include -#include -#include - #include -#include - #include -#include -#include -#include -#include -#include +#include namespace graphene { namespace elasticsearch { @@ -57,7 +42,7 @@ class elasticsearch_plugin_impl { curl = curl_easy_init(); } virtual ~elasticsearch_plugin_impl(); - void update_account_histories( const signed_block& b ); + bool update_account_histories( const signed_block& b ); graphene::chain::database& database() { @@ -70,15 +55,39 @@ class elasticsearch_plugin_impl std::string _elasticsearch_node_url = "http://localhost:9200/"; uint32_t _elasticsearch_bulk_replay = 10000; uint32_t _elasticsearch_bulk_sync = 100; - bool _elasticsearch_logs = true; bool _elasticsearch_visitor = false; + std::string _elasticsearch_basic_auth = ""; + std::string _elasticsearch_index_prefix = "bitshares-"; CURL *curl; // curl handler - vector bulk; // vector of op lines - private: - void add_elasticsearch( const account_id_type account_id, const optional& oho, const signed_block& b ); - void createBulkLine(account_transaction_history_object ath, operation_history_struct os, int op_type, block_struct bs, visitor_struct vs); - void sendBulk(std::string _elasticsearch_node_url, bool _elasticsearch_logs); + vector bulk_lines; // vector of op lines + vector prepare; + graphene::utilities::ES es; + uint32_t limit_documents; + int16_t op_type; + operation_history_struct os; + block_struct bs; + visitor_struct vs; + bulk_struct bulk_line_struct; + std::string bulk_line; + std::string index_name; + bool is_sync = false; + private: + bool add_elasticsearch( const account_id_type account_id, const optional& oho, const signed_block& b ); + const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj, + const account_id_type account_id, + const optional & oho); + const account_statistics_object& getStatsObject(const account_id_type account_id); + void growStats(const account_statistics_object& stats_obj, const account_transaction_history_object& ath); + void getOperationType(const optional & oho); + void doOperationHistory(const optional & oho); + void doBlock(const optional & oho, const signed_block& b); + void doVisitor(const optional & oho); + void checkState(const fc::time_point_sec& block_time); + void cleanObjects(const account_transaction_history_object& ath, account_id_type account_id); + void createBulkLine(const account_transaction_history_object& ath); + void prepareBulk(const account_transaction_history_id_type& ath_id); + void populateESstruct(); }; elasticsearch_plugin_impl::~elasticsearch_plugin_impl() @@ -86,8 +95,11 @@ elasticsearch_plugin_impl::~elasticsearch_plugin_impl() return; } -void elasticsearch_plugin_impl::update_account_histories( const signed_block& b ) +bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b ) { + checkState(b.timestamp); + index_name = graphene::utilities::generateIndexName(b.timestamp, _elasticsearch_index_prefix); + graphene::chain::database& db = database(); const vector >& hist = db.get_applied_operations(); bool is_first = true; @@ -125,6 +137,13 @@ void elasticsearch_plugin_impl::update_account_histories( const signed_block& b } oho = create_oho(); + // populate what we can before impacted loop + getOperationType(oho); + doOperationHistory(oho); + doBlock(oho, b); + if(_elasticsearch_visitor) + doVisitor(oho); + const operation_history_object& op = *o_op; // get the set of accounts this operation applies to @@ -143,17 +162,124 @@ void elasticsearch_plugin_impl::update_account_histories( const signed_block& b for( auto& account_id : impacted ) { - add_elasticsearch( account_id, oho, b ); + if(!add_elasticsearch( account_id, oho, b )) + return false; } } + // we send bulk at end of block when we are in sync for better real time client experience + if(is_sync) + { + populateESstruct(); + if(es.bulk_lines.size() > 0) + { + prepare.clear(); + if(!graphene::utilities::SendBulk(es)) + return false; + else + bulk_lines.clear(); + } + } + + return true; +} + +void elasticsearch_plugin_impl::checkState(const fc::time_point_sec& block_time) +{ + if((fc::time_point::now() - block_time) < fc::seconds(30)) + { + limit_documents = _elasticsearch_bulk_sync; + is_sync = true; + } + else + { + limit_documents = _elasticsearch_bulk_replay; + is_sync = false; + } } -void elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account_id, const optional & oho, const signed_block& b) +void elasticsearch_plugin_impl::getOperationType(const optional & oho) +{ + if (!oho->id.is_null()) + op_type = oho->op.which(); +} + +void elasticsearch_plugin_impl::doOperationHistory(const optional & oho) +{ + os.trx_in_block = oho->trx_in_block; + os.op_in_trx = oho->op_in_trx; + os.operation_result = fc::json::to_string(oho->result); + os.virtual_op = oho->virtual_op; + os.op = fc::json::to_string(oho->op); +} + +void elasticsearch_plugin_impl::doBlock(const optional & oho, const signed_block& b) +{ + std::string trx_id = ""; + if(oho->trx_in_block < b.transactions.size()) + trx_id = b.transactions[oho->trx_in_block].id().str(); + bs.block_num = b.block_num(); + bs.block_time = b.timestamp; + bs.trx_id = trx_id; +} + +void elasticsearch_plugin_impl::doVisitor(const optional & oho) +{ + operation_visitor o_v; + oho->op.visit(o_v); + + vs.fee_data.asset = o_v.fee_asset; + vs.fee_data.amount = o_v.fee_amount; + + vs.transfer_data.asset = o_v.transfer_asset_id; + vs.transfer_data.amount = o_v.transfer_amount; + vs.transfer_data.from = o_v.transfer_from; + vs.transfer_data.to = o_v.transfer_to; + + vs.fill_data.order_id = o_v.fill_order_id; + vs.fill_data.account_id = o_v.fill_account_id; + vs.fill_data.pays_asset_id = o_v.fill_pays_asset_id; + vs.fill_data.pays_amount = o_v.fill_pays_amount; + vs.fill_data.receives_asset_id = o_v.fill_receives_asset_id; + vs.fill_data.receives_amount = o_v.fill_receives_amount; + //vs.fill_data.fill_price = o_v.fill_fill_price; + //vs.fill_data.is_maker = o_v.fill_is_maker; +} + +bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account_id, + const optional & oho, + const signed_block& b) +{ + const auto &stats_obj = getStatsObject(account_id); + const auto &ath = addNewEntry(stats_obj, account_id, oho); + growStats(stats_obj, ath); + createBulkLine(ath); + prepareBulk(ath.id); + cleanObjects(ath, account_id); + + if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech + prepare.clear(); + populateESstruct(); + if(!graphene::utilities::SendBulk(es)) + return false; + else + bulk_lines.clear(); + } + + return true; +} + +const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const account_id_type account_id) { graphene::chain::database& db = database(); - const auto &stats_obj = account_id(db).statistics(db); + const auto &acct = db.get(account_id); + return acct.statistics(db); +} - // add new entry +const account_transaction_history_object& elasticsearch_plugin_impl::addNewEntry(const account_statistics_object& stats_obj, + const account_id_type account_id, + const optional & oho) +{ + graphene::chain::database& db = database(); const auto &ath = db.create([&](account_transaction_history_object &obj) { obj.operation_id = oho->id; obj.account = account_id; @@ -161,62 +287,45 @@ void elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account obj.next = stats_obj.most_recent_op; }); - // keep stats growing as no op will be removed + return ath; +} + +void elasticsearch_plugin_impl::growStats(const account_statistics_object& stats_obj, + const account_transaction_history_object& ath) +{ + graphene::chain::database& db = database(); db.modify(stats_obj, [&](account_statistics_object &obj) { obj.most_recent_op = ath.id; obj.total_ops = ath.sequence; }); +} - // operation_type - int op_type = -1; - if (!oho->id.is_null()) - op_type = oho->op.which(); +void elasticsearch_plugin_impl::createBulkLine(const account_transaction_history_object& ath) +{ + bulk_line_struct.account_history = ath; + bulk_line_struct.operation_history = os; + bulk_line_struct.operation_type = op_type; + bulk_line_struct.operation_id_num = ath.operation_id.instance.value; + bulk_line_struct.block_data = bs; + if(_elasticsearch_visitor) + bulk_line_struct.additional_data = vs; + bulk_line = fc::json::to_string(bulk_line_struct); +} - // operation history data - operation_history_struct os; - os.trx_in_block = oho->trx_in_block; - os.op_in_trx = oho->op_in_trx; - os.operation_result = fc::json::to_string(oho->result); - os.virtual_op = oho->virtual_op; - os.op = fc::json::to_string(oho->op); - - // visitor data - visitor_struct vs; - if(_elasticsearch_visitor) { - operation_visitor o_v; - oho->op.visit(o_v); - - vs.fee_data.asset = o_v.fee_asset; - vs.fee_data.amount = o_v.fee_amount; - vs.transfer_data.asset = o_v.transfer_asset_id; - vs.transfer_data.amount = o_v.transfer_amount; - vs.transfer_data.from = o_v.transfer_from; - vs.transfer_data.to = o_v.transfer_to; - } - - // block data - std::string trx_id = ""; - if(!b.transactions.empty() && oho->trx_in_block < b.transactions.size()) { - trx_id = b.transactions[oho->trx_in_block].id().str(); - } - block_struct bs; - bs.block_num = b.block_num(); - bs.block_time = b.timestamp; - bs.trx_id = trx_id; - - // check if we are in replay or in sync and change number of bulk documents accordingly - uint32_t limit_documents = 0; - if((fc::time_point::now() - b.timestamp) < fc::seconds(30)) - limit_documents = _elasticsearch_bulk_sync; - else - limit_documents = _elasticsearch_bulk_replay; - - createBulkLine(ath, os, op_type, bs, vs); // we have everything, creating bulk line - - if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech - sendBulk(_elasticsearch_node_url, _elasticsearch_logs); - } +void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id_type& ath_id) +{ + const std::string _id = fc::json::to_string(ath_id); + fc::mutable_variant_object bulk_header; + bulk_header["_index"] = index_name; + bulk_header["_type"] = "data"; + bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "." + ath_id.instance; + prepare = graphene::utilities::createBulk(bulk_header, bulk_line); + bulk_lines.insert(bulk_lines.end(), prepare.begin(), prepare.end()); +} +void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_object& ath, account_id_type account_id) +{ + graphene::chain::database& db = database(); // remove everything except current object from ath const auto &his_idx = db.get_index_type(); const auto &by_seq_idx = his_idx.indices().get(); @@ -243,95 +352,12 @@ void elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account } } -void elasticsearch_plugin_impl::createBulkLine(account_transaction_history_object ath, operation_history_struct os, int op_type, block_struct bs, visitor_struct vs) +void elasticsearch_plugin_impl::populateESstruct() { - bulk_struct bulks; - bulks.account_history = ath; - bulks.operation_history = os; - bulks.operation_type = op_type; - bulks.block_data = bs; - bulks.additional_data = vs; - - std::string alltogether = fc::json::to_string(bulks); - - auto block_date = bulks.block_data.block_time.to_iso_string(); - std::vector parts; - boost::split(parts, block_date, boost::is_any_of("-")); - std::string index_name = "graphene-" + parts[0] + "-" + parts[1]; - - // bulk header before each line, op_type = create to avoid dups, index id will be ath id(2.9.X). - std::string _id = fc::json::to_string(ath.id); - bulk.push_back("{ \"index\" : { \"_index\" : \""+index_name+"\", \"_type\" : \"data\", \"op_type\" : \"create\", \"_id\" : "+_id+" } }"); // header - bulk.push_back(alltogether); -} - -void elasticsearch_plugin_impl::sendBulk(std::string _elasticsearch_node_url, bool _elasticsearch_logs) -{ - - // curl buffers to read - std::string readBuffer; - std::string readBuffer_logs; - - std::string bulking = ""; - - bulking = boost::algorithm::join(bulk, "\n"); - bulking = bulking + "\n"; - bulk.clear(); - - //wlog((bulking)); - - struct curl_slist *headers = NULL; - curl_slist_append(headers, "Content-Type: application/json"); - std::string url = _elasticsearch_node_url + "_bulk"; - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_POST, true); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, bulking.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&readBuffer); - curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcrp/0.1"); - //curl_easy_setopt(curl, CURLOPT_VERBOSE, true); - curl_easy_perform(curl); - - long http_code = 0; - curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); - if(http_code == 200) { - // all good, do nothing - } - else if(http_code == 429) { - // repeat request? - } - else { - // exit everything ? - } - - if(_elasticsearch_logs) { - auto logs = readBuffer; - // do logs - std::string url_logs = _elasticsearch_node_url + "logs/data/"; - curl_easy_setopt(curl, CURLOPT_URL, url_logs.c_str()); - curl_easy_setopt(curl, CURLOPT_POST, true); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, logs.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &readBuffer_logs); - curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcrp/0.1"); - //curl_easy_setopt(curl, CURLOPT_VERBOSE, true); - //ilog("log here curl: ${output}", ("output", readBuffer_logs)); - curl_easy_perform(curl); - - http_code = 0; - curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); - if(http_code == 200) { - // all good, do nothing - } - else if(http_code == 429) { - // repeat request? - } - else { - // exit everything ? - } - } + es.curl = curl; + es.bulk_lines = bulk_lines; + es.elasticsearch_url = _elasticsearch_node_url; + es.auth = _elasticsearch_basic_auth; } } // end namespace detail @@ -363,15 +389,21 @@ void elasticsearch_plugin::plugin_set_program_options( ("elasticsearch-node-url", boost::program_options::value(), "Elastic Search database node url") ("elasticsearch-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(5000)") ("elasticsearch-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a syncronied chain(10)") - ("elasticsearch-logs", boost::program_options::value(), "Log bulk events to database") ("elasticsearch-visitor", boost::program_options::value(), "Use visitor to index additional data(slows down the replay)") + ("elasticsearch-basic-auth", boost::program_options::value(), "Pass basic auth to elasticsearch database ") + ("elasticsearch-index-prefix", boost::program_options::value(), "Add a prefix to the index(bitshares-)") ; cfg.add(cli); } void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options) { - database().applied_block.connect( [&]( const signed_block& b){ my->update_account_histories(b); } ); + database().applied_block.connect( [&]( const signed_block& b) { + if(!my->update_account_histories(b)) + { + FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); + } + } ); my->_oho_index = database().add_index< primary_index< operation_history_index > >(); database().add_index< primary_index< account_transaction_history_index > >(); @@ -384,16 +416,27 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia if (options.count("elasticsearch-bulk-sync")) { my->_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as(); } - if (options.count("elasticsearch-logs")) { - my->_elasticsearch_logs = options["elasticsearch-logs"].as(); - } if (options.count("elasticsearch-visitor")) { my->_elasticsearch_visitor = options["elasticsearch-visitor"].as(); } + if (options.count("elasticsearch-basic-auth")) { + my->_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as(); + } + if (options.count("elasticsearch-index-prefix")) { + my->_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as(); + } } void elasticsearch_plugin::plugin_startup() { + graphene::utilities::ES es; + es.curl = my->curl; + es.elasticsearch_url = my->_elasticsearch_node_url; + es.auth = my->_elasticsearch_basic_auth; + + if(!graphene::utilities::checkES(es)) + FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_elasticsearch_node_url)); + ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin"); } } } diff --git a/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp b/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp index cc51c247..19a48843 100644 --- a/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp +++ b/libraries/plugins/elasticsearch/include/graphene/elasticsearch/elasticsearch_plugin.hpp @@ -29,9 +29,6 @@ namespace graphene { namespace elasticsearch { using namespace chain; - //using namespace graphene::db; - //using boost::multi_index_container; - //using namespace boost::multi_index; // // Plugins should #define their SPACE_ID's so plugins with @@ -47,12 +44,6 @@ namespace graphene { namespace elasticsearch { #define ELASTICSEARCH_SPACE_ID 6 #endif -static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) -{ - ((std::string*)userp)->append((char*)contents, size * nmemb); - return size * nmemb; -} - namespace detail { class elasticsearch_plugin_impl; @@ -76,7 +67,6 @@ class elasticsearch_plugin : public graphene::app::plugin std::unique_ptr my; }; - struct operation_visitor { typedef void result_type; @@ -99,6 +89,31 @@ struct operation_visitor transfer_from = o.from; transfer_to = o.to; } + + object_id_type fill_order_id; + account_id_type fill_account_id; + asset_id_type fill_pays_asset_id; + share_type fill_pays_amount; + asset_id_type fill_receives_asset_id; + share_type fill_receives_amount; + //double fill_fill_price; + //bool fill_is_maker; + + void operator()( const graphene::chain::fill_order_operation& o ) + { + fee_asset = o.fee.asset_id; + fee_amount = o.fee.amount; + + fill_order_id = o.order_id; + fill_account_id = o.account_id; + fill_pays_asset_id = o.pays.asset_id; + fill_pays_amount = o.pays.amount; + fill_receives_asset_id = o.receives.asset_id; + fill_receives_amount = o.receives.amount; + //fill_fill_price = o.fill_price.to_real(); + //fill_is_maker = o.is_maker; + } + template void operator()( const T& o ) { @@ -133,27 +148,38 @@ struct transfer_struct { account_id_type to; }; +struct fill_struct { + object_id_type order_id; + account_id_type account_id; + asset_id_type pays_asset_id; + share_type pays_amount; + asset_id_type receives_asset_id; + share_type receives_amount; + double fill_price; + bool is_maker; +}; + struct visitor_struct { fee_struct fee_data; transfer_struct transfer_data; + fill_struct fill_data; }; struct bulk_struct { account_transaction_history_object account_history; operation_history_struct operation_history; int operation_type; + int operation_id_num; block_struct block_data; - visitor_struct additional_data; + optional additional_data; }; - } } //graphene::elasticsearch FC_REFLECT( graphene::elasticsearch::operation_history_struct, (trx_in_block)(op_in_trx)(operation_result)(virtual_op)(op) ) FC_REFLECT( graphene::elasticsearch::block_struct, (block_num)(block_time)(trx_id) ) FC_REFLECT( graphene::elasticsearch::fee_struct, (asset)(amount) ) FC_REFLECT( graphene::elasticsearch::transfer_struct, (asset)(amount)(from)(to) ) -FC_REFLECT( graphene::elasticsearch::visitor_struct, (fee_data)(transfer_data) ) -FC_REFLECT( graphene::elasticsearch::bulk_struct, (account_history)(operation_history)(operation_type)(block_data)(additional_data) ) - - +FC_REFLECT( graphene::elasticsearch::fill_struct, (order_id)(account_id)(pays_asset_id)(pays_amount)(receives_asset_id)(receives_amount)(fill_price)(is_maker)) +FC_REFLECT( graphene::elasticsearch::visitor_struct, (fee_data)(transfer_data)(fill_data) ) +FC_REFLECT( graphene::elasticsearch::bulk_struct, (account_history)(operation_history)(operation_type)(operation_id_num)(block_data)(additional_data) ) diff --git a/libraries/plugins/es_objects/es_objects.cpp b/libraries/plugins/es_objects/es_objects.cpp index 7c9c2b61..5f04e40c 100644 --- a/libraries/plugins/es_objects/es_objects.cpp +++ b/libraries/plugins/es_objects/es_objects.cpp @@ -47,10 +47,11 @@ class es_objects_plugin_impl { curl = curl_easy_init(); } virtual ~es_objects_plugin_impl(); - void updateDatabase( const vector& ids , bool isNew); + bool updateDatabase( const vector& ids , bool isNew); es_objects_plugin& _self; std::string _es_objects_elasticsearch_url = "http://localhost:9200/"; + std::string _es_objects_auth = ""; uint32_t _es_objects_bulk_replay = 5000; uint32_t _es_objects_bulk_sync = 10; bool _es_objects_proposals = true; @@ -59,24 +60,27 @@ class es_objects_plugin_impl bool _es_objects_balances = true; bool _es_objects_limit_orders = true; bool _es_objects_asset_bitasset = true; - bool _es_objects_logs = true; + std::string _es_objects_index_prefix = "objects-"; CURL *curl; // curl handler vector bulk; vector prepare; - map bitassets; - //uint32_t bitasset_seq; + map bitassets; + map accounts; + map proposals; + map assets; + map balances; + map limit_orders; private: - void PrepareProposal(const proposal_object* proposal_object, const fc::time_point_sec block_time, uint32_t block_number); - void PrepareAccount(const account_object* account_object, const fc::time_point_sec block_time, uint32_t block_number); - void PrepareAsset(const asset_object* asset_object, const fc::time_point_sec block_time, uint32_t block_number); - void PrepareBalance(const balance_object* balance_object, const fc::time_point_sec block_time, uint32_t block_number); - void PrepareLimit(const limit_order_object* limit_object, const fc::time_point_sec block_time, uint32_t block_number); - void PrepareBitAsset(const asset_bitasset_data_object* bitasset_object, const fc::time_point_sec block_time, uint32_t block_number); + void PrepareProposal(const proposal_object& proposal_object, const fc::time_point_sec& block_time, const uint32_t& block_number); + void PrepareAccount(const account_object& account_object, const fc::time_point_sec& block_time, const uint32_t& block_number); + void PrepareAsset(const asset_object& asset_object, const fc::time_point_sec& block_time, const uint32_t& block_number); + void PrepareBalance(const balance_object& balance_object, const fc::time_point_sec& block_time, const uint32_t& block_number); + void PrepareLimit(const limit_order_object& limit_object, const fc::time_point_sec& block_time, const uint32_t& block_number); + void PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, const fc::time_point_sec& block_time, const uint32_t& block_number); }; -void es_objects_plugin_impl::updateDatabase( const vector& ids , bool isNew) +bool es_objects_plugin_impl::updateDatabase( const vector& ids , bool isNew) { - graphene::chain::database &db = _self.database(); const fc::time_point_sec block_time = db.head_block_time(); @@ -89,175 +93,259 @@ void es_objects_plugin_impl::updateDatabase( const vector& ids , else limit_documents = _es_objects_bulk_replay; - if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech - if(!graphene::utilities::SendBulk(curl, bulk, _es_objects_elasticsearch_url, _es_objects_logs, "objects_logs")) - elog("Error sending data to database"); - bulk.clear(); - } - for(auto const& value: ids) { if(value.is() && _es_objects_proposals) { auto obj = db.find_object(value); auto p = static_cast(obj); if(p != nullptr) - PrepareProposal(p, block_time, block_number); + PrepareProposal(*p, block_time, block_number); } else if(value.is() && _es_objects_accounts) { auto obj = db.find_object(value); auto a = static_cast(obj); if(a != nullptr) - PrepareAccount(a, block_time, block_number); + PrepareAccount(*a, block_time, block_number); } else if(value.is() && _es_objects_assets) { auto obj = db.find_object(value); auto a = static_cast(obj); if(a != nullptr) - PrepareAsset(a, block_time, block_number); + PrepareAsset(*a, block_time, block_number); } else if(value.is() && _es_objects_balances) { auto obj = db.find_object(value); auto b = static_cast(obj); if(b != nullptr) - PrepareBalance(b, block_time, block_number); + PrepareBalance(*b, block_time, block_number); } else if(value.is() && _es_objects_limit_orders) { auto obj = db.find_object(value); auto l = static_cast(obj); if(l != nullptr) - PrepareLimit(l, block_time, block_number); + PrepareLimit(*l, block_time, block_number); } else if(value.is() && _es_objects_asset_bitasset) { auto obj = db.find_object(value); auto ba = static_cast(obj); if(ba != nullptr) - PrepareBitAsset(ba, block_time, block_number); + PrepareBitAsset(*ba, block_time, block_number); } } + + if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech + + graphene::utilities::ES es; + es.curl = curl; + es.bulk_lines = bulk; + es.elasticsearch_url = _es_objects_elasticsearch_url; + es.auth = _es_objects_auth; + + if(!graphene::utilities::SendBulk(es)) + return false; + else + bulk.clear(); + } + + return true; } -void es_objects_plugin_impl::PrepareProposal(const proposal_object* proposal_object, const fc::time_point_sec block_time, uint32_t block_number) +void es_objects_plugin_impl::PrepareProposal(const proposal_object& proposal_object, + const fc::time_point_sec& block_time, const uint32_t& block_number) { proposal_struct prop; - prop.object_id = proposal_object->id; + prop.object_id = proposal_object.id; prop.block_time = block_time; prop.block_number = block_number; - prop.expiration_time = proposal_object->expiration_time; - prop.review_period_time = proposal_object->review_period_time; - prop.proposed_transaction = fc::json::to_string(proposal_object->proposed_transaction); - prop.required_owner_approvals = fc::json::to_string(proposal_object->required_owner_approvals); - prop.available_owner_approvals = fc::json::to_string(proposal_object->available_owner_approvals); - prop.required_active_approvals = fc::json::to_string(proposal_object->required_active_approvals); - prop.available_key_approvals = fc::json::to_string(proposal_object->available_key_approvals); - prop.proposer = proposal_object->proposer; + prop.expiration_time = proposal_object.expiration_time; + prop.review_period_time = proposal_object.review_period_time; + prop.proposed_transaction = fc::json::to_string(proposal_object.proposed_transaction); + prop.required_owner_approvals = fc::json::to_string(proposal_object.required_owner_approvals); + prop.available_owner_approvals = fc::json::to_string(proposal_object.available_owner_approvals); + prop.required_active_approvals = fc::json::to_string(proposal_object.required_active_approvals); + prop.available_key_approvals = fc::json::to_string(proposal_object.available_key_approvals); + prop.proposer = proposal_object.proposer; + + auto it = proposals.find(proposal_object.id); + if(it == proposals.end()) + proposals[proposal_object.id] = prop; + else { + if(it->second == prop) return; + else proposals[proposal_object.id] = prop; + } std::string data = fc::json::to_string(prop); - prepare = graphene::utilities::createBulk("bitshares-proposal", data, "", 1); + + fc::mutable_variant_object bulk_header; + bulk_header["_index"] = _es_objects_index_prefix + "proposal"; + bulk_header["_type"] = "data"; + + prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } -void es_objects_plugin_impl::PrepareAccount(const account_object* account_object, const fc::time_point_sec block_time, uint32_t block_number) +void es_objects_plugin_impl::PrepareAccount(const account_object& account_object, + const fc::time_point_sec& block_time, const uint32_t& block_number) { account_struct acct; - acct.object_id = account_object->id; + acct.object_id = account_object.id; acct.block_time = block_time; acct.block_number = block_number; - acct.membership_expiration_date = account_object->membership_expiration_date; - acct.registrar = account_object->registrar; - acct.referrer = account_object->referrer; - acct.lifetime_referrer = account_object->lifetime_referrer; - acct.network_fee_percentage = account_object->network_fee_percentage; - acct.lifetime_referrer_fee_percentage = account_object->lifetime_referrer_fee_percentage; - acct.referrer_rewards_percentage = account_object->referrer_rewards_percentage; - acct.name = account_object->name; - acct.owner_account_auths = fc::json::to_string(account_object->owner.account_auths); - acct.owner_key_auths = fc::json::to_string(account_object->owner.key_auths); - acct.owner_address_auths = fc::json::to_string(account_object->owner.address_auths); - acct.active_account_auths = fc::json::to_string(account_object->active.account_auths); - acct.active_key_auths = fc::json::to_string(account_object->active.key_auths); - acct.active_address_auths = fc::json::to_string(account_object->active.address_auths); - acct.voting_account = account_object->options.voting_account; + acct.membership_expiration_date = account_object.membership_expiration_date; + acct.registrar = account_object.registrar; + acct.referrer = account_object.referrer; + acct.lifetime_referrer = account_object.lifetime_referrer; + acct.network_fee_percentage = account_object.network_fee_percentage; + acct.lifetime_referrer_fee_percentage = account_object.lifetime_referrer_fee_percentage; + acct.referrer_rewards_percentage = account_object.referrer_rewards_percentage; + acct.name = account_object.name; + acct.owner_account_auths = fc::json::to_string(account_object.owner.account_auths); + acct.owner_key_auths = fc::json::to_string(account_object.owner.key_auths); + acct.owner_address_auths = fc::json::to_string(account_object.owner.address_auths); + acct.active_account_auths = fc::json::to_string(account_object.active.account_auths); + acct.active_key_auths = fc::json::to_string(account_object.active.key_auths); + acct.active_address_auths = fc::json::to_string(account_object.active.address_auths); + acct.voting_account = account_object.options.voting_account; + + auto it = accounts.find(account_object.id); + if(it == accounts.end()) + accounts[account_object.id] = acct; + else { + if(it->second == acct) return; + else accounts[account_object.id] = acct; + } std::string data = fc::json::to_string(acct); - prepare = graphene::utilities::createBulk("bitshares-account", data, "", 1); + + fc::mutable_variant_object bulk_header; + bulk_header["_index"] = _es_objects_index_prefix + "acount"; + bulk_header["_type"] = "data"; + + prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } -void es_objects_plugin_impl::PrepareAsset(const asset_object* asset_object, const fc::time_point_sec block_time, uint32_t block_number) +void es_objects_plugin_impl::PrepareAsset(const asset_object& asset_object, + const fc::time_point_sec& block_time, const uint32_t& block_number) { - asset_struct _asset; - _asset.object_id = asset_object->id; - _asset.block_time = block_time; - _asset.block_number = block_number; - _asset.symbol = asset_object->symbol; - _asset.issuer = asset_object->issuer; - _asset.is_market_issued = asset_object->is_market_issued(); - _asset.dynamic_asset_data_id = asset_object->dynamic_asset_data_id; - _asset.bitasset_data_id = asset_object->bitasset_data_id; + asset_struct asset; + asset.object_id = asset_object.id; + asset.block_time = block_time; + asset.block_number = block_number; + asset.symbol = asset_object.symbol; + asset.issuer = asset_object.issuer; + asset.is_market_issued = asset_object.is_market_issued(); + asset.dynamic_asset_data_id = asset_object.dynamic_asset_data_id; + asset.bitasset_data_id = asset_object.bitasset_data_id; - std::string data = fc::json::to_string(_asset); - prepare = graphene::utilities::createBulk("bitshares-asset", data, fc::json::to_string(asset_object->id), 0); + auto it = assets.find(asset_object.id); + if(it == assets.end()) + assets[asset_object.id] = asset; + else { + if(it->second == asset) return; + else assets[asset_object.id] = asset; + } + + std::string data = fc::json::to_string(asset); + + fc::mutable_variant_object bulk_header; + bulk_header["_index"] = _es_objects_index_prefix + "asset"; + bulk_header["_type"] = "data"; + + prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } -void es_objects_plugin_impl::PrepareBalance(const balance_object* balance_object, const fc::time_point_sec block_time, uint32_t block_number) +void es_objects_plugin_impl::PrepareBalance(const balance_object& balance_object, + const fc::time_point_sec& block_time, const uint32_t& block_number) { balance_struct balance; - balance.object_id = balance_object->id; + balance.object_id = balance_object.id; balance.block_time = block_time; - balance.block_number = block_number;balance.owner = balance_object->owner; - balance.asset_id = balance_object->balance.asset_id; - balance.amount = balance_object->balance.amount; + balance.block_number = block_number;balance.owner = balance_object.owner; + balance.asset_id = balance_object.balance.asset_id; + balance.amount = balance_object.balance.amount; + + auto it = balances.find(balance_object.id); + if(it == balances.end()) + balances[balance_object.id] = balance; + else { + if(it->second == balance) return; + else balances[balance_object.id] = balance; + } std::string data = fc::json::to_string(balance); - prepare = graphene::utilities::createBulk("bitshares-balance", data, "", 1); + + fc::mutable_variant_object bulk_header; + bulk_header["_index"] = _es_objects_index_prefix + "balance"; + bulk_header["_type"] = "data"; + + prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } -void es_objects_plugin_impl::PrepareLimit(const limit_order_object* limit_object, const fc::time_point_sec block_time, uint32_t block_number) +void es_objects_plugin_impl::PrepareLimit(const limit_order_object& limit_object, + const fc::time_point_sec& block_time, const uint32_t& block_number) { limit_order_struct limit; - limit.object_id = limit_object->id; + limit.object_id = limit_object.id; limit.block_time = block_time; limit.block_number = block_number; - limit.expiration = limit_object->expiration; - limit.seller = limit_object->seller; - limit.for_sale = limit_object->for_sale; - limit.sell_price = limit_object->sell_price; - limit.deferred_fee = limit_object->deferred_fee; + limit.expiration = limit_object.expiration; + limit.seller = limit_object.seller; + limit.for_sale = limit_object.for_sale; + limit.sell_price = limit_object.sell_price; + limit.deferred_fee = limit_object.deferred_fee; + + auto it = limit_orders.find(limit_object.id); + if(it == limit_orders.end()) + limit_orders[limit_object.id] = limit; + else { + if(it->second == limit) return; + else limit_orders[limit_object.id] = limit; + } std::string data = fc::json::to_string(limit); - prepare = graphene::utilities::createBulk("bitshares-limitorder", data, "", 1); + + fc::mutable_variant_object bulk_header; + bulk_header["_index"] = _es_objects_index_prefix + "limitorder"; + bulk_header["_type"] = "data"; + + prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } -void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object* bitasset_object, const fc::time_point_sec block_time, uint32_t block_number) +void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, + const fc::time_point_sec& block_time, const uint32_t& block_number) { - if(!bitasset_object->is_prediction_market) { - - auto object_id = bitasset_object->id; - auto it = bitassets.find(object_id); - if(it == bitassets.end()) - bitassets[object_id] = fc::json::to_string(bitasset_object->current_feed); - else { - if(it->second == fc::json::to_string(bitasset_object->current_feed)) return; - else bitassets[object_id] = fc::json::to_string(bitasset_object->current_feed); - } + if(!bitasset_object.is_prediction_market) { bitasset_struct bitasset; - - bitasset.object_id = bitasset_object->id; + bitasset.object_id = bitasset_object.id; bitasset.block_time = block_time; bitasset.block_number = block_number; - bitasset.current_feed = fc::json::to_string(bitasset_object->current_feed); - bitasset.current_feed_publication_time = bitasset_object->current_feed_publication_time; + bitasset.current_feed = fc::json::to_string(bitasset_object.current_feed); + bitasset.current_feed_publication_time = bitasset_object.current_feed_publication_time; + + auto it = bitassets.find(bitasset_object.id); + if(it == bitassets.end()) + bitassets[bitasset_object.id] = bitasset; + else { + if(it->second == bitasset) return; + else bitassets[bitasset_object.id] = bitasset; + } std::string data = fc::json::to_string(bitasset); - prepare = graphene::utilities::createBulk("bitshares-bitasset", data, "", 1); + + fc::mutable_variant_object bulk_header; + bulk_header["_index"] = _es_objects_index_prefix + "bitasset"; + bulk_header["_type"] = "data"; + + prepare = graphene::utilities::createBulk(bulk_header, data); bulk.insert(bulk.end(), prepare.begin(), prepare.end()); prepare.clear(); } @@ -268,7 +356,6 @@ es_objects_plugin_impl::~es_objects_plugin_impl() return; } - } // end namespace detail es_objects_plugin::es_objects_plugin() : @@ -296,7 +383,7 @@ void es_objects_plugin::plugin_set_program_options( { cli.add_options() ("es-objects-elasticsearch-url", boost::program_options::value(), "Elasticsearch node url") - ("es-objects-logs", boost::program_options::value(), "Log bulk events to database") + ("es-objects-auth", boost::program_options::value(), "Basic auth username:password") ("es-objects-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(5000)") ("es-objects-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a syncronied chain(10)") ("es-objects-proposals", boost::program_options::value(), "Store proposal objects") @@ -305,21 +392,30 @@ void es_objects_plugin::plugin_set_program_options( ("es-objects-balances", boost::program_options::value(), "Store balances objects") ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects") ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data") - + ("es-objects-index-prefix", boost::program_options::value(), "Add a prefix to the index(objects-)") ; cfg.add(cli); } void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options) { - database().new_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ){ my->updateDatabase(ids, 1); }); - database().changed_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ){ my->updateDatabase(ids, 0); }); - + database().new_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ) { + if(!my->updateDatabase(ids, 1)) + { + FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); + } + }); + database().changed_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ) { + if(!my->updateDatabase(ids, 0)) + { + FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying."); + } + }); if (options.count("es-objects-elasticsearch-url")) { my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); } - if (options.count("es-objects-logs")) { - my->_es_objects_logs = options["es-objects-logs"].as(); + if (options.count("es-objects-auth")) { + my->_es_objects_auth = options["es-objects-auth"].as(); } if (options.count("es-objects-bulk-replay")) { my->_es_objects_bulk_replay = options["es-objects-bulk-replay"].as(); @@ -345,11 +441,22 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable if (options.count("es-objects-asset-bitasset")) { my->_es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as(); } + if (options.count("es-objects-index-prefix")) { + my->_es_objects_index_prefix = options["es-objects-index-prefix"].as(); + } } void es_objects_plugin::plugin_startup() { - ilog("elasticsearch objects: plugin_startup() begin"); + graphene::utilities::ES es; + es.curl = my->curl; + es.elasticsearch_url = my->_es_objects_elasticsearch_url; + es.auth = my->_es_objects_auth; + es.auth = my->_es_objects_index_prefix; + + if(!graphene::utilities::checkES(es)) + FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_es_objects_elasticsearch_url)); + ilog("elasticsearch OBJECTS: plugin_startup() begin"); } -} } \ No newline at end of file +} } diff --git a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp index 31809e04..d9c38711 100644 --- a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp +++ b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp @@ -68,6 +68,20 @@ struct proposal_struct { string available_key_approvals; account_id_type proposer; + friend bool operator==(const proposal_struct& l, const proposal_struct& r) + { + return std::tie(l.object_id, l.block_time, l.block_number, l.expiration_time, l.review_period_time, + l.proposed_transaction, l.required_active_approvals, l.available_active_approvals, + l.required_owner_approvals, l.available_owner_approvals, l.available_key_approvals, + l.proposer) == std::tie(r.object_id, r.block_time, r.block_number, r.expiration_time, r.review_period_time, + r.proposed_transaction, r.required_active_approvals, r.available_active_approvals, + r.required_owner_approvals, r.available_owner_approvals, r.available_key_approvals, + r.proposer); + } + friend bool operator!=(const proposal_struct& l, const proposal_struct& r) + { + return !operator==(l, r); + } }; struct account_struct { object_id_type object_id; @@ -88,6 +102,23 @@ struct account_struct { string active_key_auths; string active_address_auths; account_id_type voting_account; + + friend bool operator==(const account_struct& l, const account_struct& r) + { + return std::tie(l.object_id, l.block_time, l.block_number, l.membership_expiration_date, l.registrar, l.referrer, + l.lifetime_referrer, l.network_fee_percentage, l.lifetime_referrer_fee_percentage, + l.referrer_rewards_percentage, l.name, l.owner_account_auths, l.owner_key_auths, + l.owner_address_auths, l.active_account_auths, l.active_key_auths, l.active_address_auths, + l.voting_account) == std::tie(r.object_id, r.block_time, r.block_number, r.membership_expiration_date, r.registrar, r.referrer, + r.lifetime_referrer, r.network_fee_percentage, r.lifetime_referrer_fee_percentage, + r.referrer_rewards_percentage, r.name, r.owner_account_auths, r.owner_key_auths, + r.owner_address_auths, r.active_account_auths, r.active_key_auths, r.active_address_auths, + r.voting_account); + } + friend bool operator!=(const account_struct& l, const account_struct& r) + { + return !operator==(l, r); + } }; struct asset_struct { object_id_type object_id; @@ -99,6 +130,17 @@ struct asset_struct { asset_dynamic_data_id_type dynamic_asset_data_id; optional bitasset_data_id; + friend bool operator==(const asset_struct& l, const asset_struct& r) + { + return std::tie(l.object_id, l.block_time, l.block_number, l.symbol, l.issuer, l.is_market_issued, + l.dynamic_asset_data_id, l.bitasset_data_id) == std::tie(r.object_id, r.block_time, + r.block_number, r.symbol, r.issuer, r.is_market_issued, r.dynamic_asset_data_id, + r.bitasset_data_id); + } + friend bool operator!=(const asset_struct& l, const asset_struct& r) + { + return !operator==(l, r); + } }; struct balance_struct { object_id_type object_id; @@ -107,6 +149,16 @@ struct balance_struct { address owner; asset_id_type asset_id; share_type amount; + + friend bool operator==(const balance_struct& l, const balance_struct& r) + { + return std::tie(l.object_id, l.block_time, l.block_number, l.block_time, l.owner, l.asset_id, l.amount) + == std::tie(r.object_id, r.block_time, r.block_number, r.block_time, r.owner, r.asset_id, r.amount); + } + friend bool operator!=(const balance_struct& l, const balance_struct& r) + { + return !operator==(l, r); + } }; struct limit_order_struct { object_id_type object_id; @@ -117,6 +169,16 @@ struct limit_order_struct { share_type for_sale; price sell_price; share_type deferred_fee; + + friend bool operator==(const limit_order_struct& l, const limit_order_struct& r) + { + return std::tie(l.object_id, l.block_time, l.block_number, l.expiration, l.seller, l.for_sale, l.sell_price, l.deferred_fee) + == std::tie(r.object_id, r.block_time, r.block_number, r.expiration, r.seller, r.for_sale, r.sell_price, r.deferred_fee); + } + friend bool operator!=(const limit_order_struct& l, const limit_order_struct& r) + { + return !operator==(l, r); + } }; struct bitasset_struct { object_id_type object_id; @@ -125,6 +187,16 @@ struct bitasset_struct { string current_feed; time_point_sec current_feed_publication_time; time_point_sec feed_expiration_time; + + friend bool operator==(const bitasset_struct& l, const bitasset_struct& r) + { + return std::tie(l.object_id, l.block_time, l.block_number, l.current_feed, l.current_feed_publication_time) + == std::tie(r.object_id, r.block_time, r.block_number, r.current_feed, r.current_feed_publication_time); + } + friend bool operator!=(const bitasset_struct& l, const bitasset_struct& r) + { + return !operator==(l, r); + } }; } } //graphene::es_objects @@ -134,4 +206,4 @@ FC_REFLECT( graphene::es_objects::account_struct, (object_id)(block_time)(block_ FC_REFLECT( graphene::es_objects::asset_struct, (object_id)(block_time)(block_number)(symbol)(issuer)(is_market_issued)(dynamic_asset_data_id)(bitasset_data_id) ) FC_REFLECT( graphene::es_objects::balance_struct, (object_id)(block_time)(block_number)(block_time)(owner)(asset_id)(amount) ) FC_REFLECT( graphene::es_objects::limit_order_struct, (object_id)(block_time)(block_number)(expiration)(seller)(for_sale)(sell_price)(deferred_fee) ) -FC_REFLECT( graphene::es_objects::bitasset_struct, (object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time) ) \ No newline at end of file +FC_REFLECT( graphene::es_objects::bitasset_struct, (object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time) ) diff --git a/libraries/utilities/elasticsearch.cpp b/libraries/utilities/elasticsearch.cpp index 1674a12a..11a9561b 100644 --- a/libraries/utilities/elasticsearch.cpp +++ b/libraries/utilities/elasticsearch.cpp @@ -24,100 +24,167 @@ #include #include +#include #include +#include + +size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) +{ + ((std::string*)userp)->append((char*)contents, size * nmemb); + return size * nmemb; +} namespace graphene { namespace utilities { -bool SendBulk(CURL *curl, std::vector& bulk, std::string elasticsearch_url, bool do_logs, std::string logs_index) +bool checkES(ES& es) { - // curl buffers to read - std::string readBuffer; - std::string readBuffer_logs; + graphene::utilities::CurlRequest curl_request; + curl_request.handler = es.curl; + curl_request.url = es.elasticsearch_url + "_nodes"; + curl_request.auth = es.auth; + curl_request.type = "GET"; - std::string bulking = ""; + if(doCurl(curl_request).empty()) + return false; + return true; - bulking = boost::algorithm::join(bulk, "\n"); - bulking = bulking + "\n"; - bulk.clear(); +} +const std::string simpleQuery(ES& es) +{ + graphene::utilities::CurlRequest curl_request; + curl_request.handler = es.curl; + curl_request.url = es.elasticsearch_url + es.endpoint; + curl_request.auth = es.auth; + curl_request.type = "POST"; + curl_request.query = es.query; - struct curl_slist *headers = NULL; - headers = curl_slist_append(headers, "Content-Type: application/json"); - std::string url = elasticsearch_url + "_bulk"; - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_POST, true); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, bulking.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&readBuffer); - curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcrp/0.1"); - //curl_easy_setopt(curl, CURLOPT_VERBOSE, true); - curl_easy_perform(curl); - - long http_code = 0; - curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); - if(http_code == 200) { - // all good, do nothing - } - else if(http_code == 413) { - elog("413 error: Can be low space disk"); - return 0; - } - else { - elog(http_code + "error: Unknown error"); - return 0; - } - - if(do_logs) { - auto logs = readBuffer; - // do logs - std::string url_logs = elasticsearch_url + logs_index + "/data/"; - curl_easy_setopt(curl, CURLOPT_URL, url_logs.c_str()); - curl_easy_setopt(curl, CURLOPT_POST, true); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, logs.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &readBuffer_logs); - curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcrp/0.1"); - curl_easy_perform(curl); - - http_code = 0; - curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); - if(http_code == 200) { - // all good, do nothing - return 1; - } - else if(http_code == 201) { - // 201 is ok - return 1; - } - else if(http_code == 409) { - // 409 for record already exist is ok - return 1; - } - else if(http_code == 413) { - elog("413 error: Can be low space disk"); - return 0; - } - else { - elog(http_code + "error: Unknown error"); - return 0; - } - } - return 0; + return doCurl(curl_request); } -std::vector createBulk(std::string index_name, std::string data, std::string id, bool onlycreate) +bool SendBulk(ES& es) +{ + std::string bulking = joinBulkLines(es.bulk_lines); + + graphene::utilities::CurlRequest curl_request; + curl_request.handler = es.curl; + curl_request.url = es.elasticsearch_url + "_bulk"; + curl_request.auth = es.auth; + curl_request.type = "POST"; + curl_request.query = bulking; + + auto curlResponse = doCurl(curl_request); + + if(handleBulkResponse(getResponseCode(curl_request.handler), curlResponse)) + return true; + return false; +} + +const std::string joinBulkLines(const std::vector& bulk) +{ + auto bulking = boost::algorithm::join(bulk, "\n"); + bulking = bulking + "\n"; + + return bulking; +} +long getResponseCode(CURL *handler) +{ + long http_code = 0; + curl_easy_getinfo (handler, CURLINFO_RESPONSE_CODE, &http_code); + return http_code; +} + +bool handleBulkResponse(long http_code, const std::string& CurlReadBuffer) +{ + if(http_code == 200) { + // all good, but check errors in response + fc::variant j = fc::json::from_string(CurlReadBuffer); + bool errors = j["errors"].as_bool(); + if(errors == true) { + return false; + } + } + else { + if(http_code == 413) { + elog( "413 error: Can be low disk space" ); + } + else if(http_code == 401) { + elog( "401 error: Unauthorized" ); + } + else { + elog( std::to_string(http_code) + " error: Unknown error" ); + } + return false; + } + return true; +} + +const std::vector createBulk(const fc::mutable_variant_object& bulk_header, const std::string& data) { std::vector bulk; - std::string create_string = ""; - if(!onlycreate) - create_string = ",\"_id\" : "+id; - - bulk.push_back("{ \"index\" : { \"_index\" : \""+index_name+"\", \"_type\" : \"data\" "+create_string+" } }"); + fc::mutable_variant_object final_bulk_header; + final_bulk_header["index"] = bulk_header; + bulk.push_back(fc::json::to_string(final_bulk_header)); bulk.push_back(data); return bulk; } +bool deleteAll(ES& es) +{ + graphene::utilities::CurlRequest curl_request; + curl_request.handler = es.curl; + curl_request.url = es.elasticsearch_url + es.index_prefix + "*"; + curl_request.auth = es.auth; + curl_request.type = "DELETE"; + + auto curl_response = doCurl(curl_request); + if(curl_response.empty()) + return false; + else + return true; +} +const std::string getEndPoint(ES& es) +{ + graphene::utilities::CurlRequest curl_request; + curl_request.handler = es.curl; + curl_request.url = es.elasticsearch_url + es.endpoint; + curl_request.auth = es.auth; + curl_request.type = "GET"; + + return doCurl(curl_request); +} + +const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix) +{ + auto block_date_string = block_date.to_iso_string(); + std::vector parts; + boost::split(parts, block_date_string, boost::is_any_of("-")); + std::string index_name = _elasticsearch_index_prefix + parts[0] + "-" + parts[1]; + return index_name; +} + +const std::string doCurl(CurlRequest& curl) +{ + std::string CurlReadBuffer; + struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, "Content-Type: application/json"); + + curl_easy_setopt(curl.handler, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl.handler, CURLOPT_URL, curl.url.c_str()); + curl_easy_setopt(curl.handler, CURLOPT_CUSTOMREQUEST, curl.type.c_str()); + if(curl.type == "POST") + { + curl_easy_setopt(curl.handler, CURLOPT_POST, true); + curl_easy_setopt(curl.handler, CURLOPT_POSTFIELDS, curl.query.c_str()); + } + curl_easy_setopt(curl.handler, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl.handler, CURLOPT_WRITEDATA, (void *)&CurlReadBuffer); + curl_easy_setopt(curl.handler, CURLOPT_USERAGENT, "libcrp/0.1"); + if(!curl.auth.empty()) + curl_easy_setopt(curl.handler, CURLOPT_USERPWD, curl.auth.c_str()); + curl_easy_perform(curl.handler); + + return CurlReadBuffer; +} } } // end namespace graphene::utilities diff --git a/libraries/utilities/include/graphene/utilities/elasticsearch.hpp b/libraries/utilities/include/graphene/utilities/elasticsearch.hpp index 517f2345..898464b1 100644 --- a/libraries/utilities/include/graphene/utilities/elasticsearch.hpp +++ b/libraries/utilities/include/graphene/utilities/elasticsearch.hpp @@ -27,16 +27,42 @@ #include #include +#include +#include -static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) -{ - ((std::string*)userp)->append((char*)contents, size * nmemb); - return size * nmemb; -} +size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp); namespace graphene { namespace utilities { - bool SendBulk(CURL *curl, std::vector & bulk, std::string elasticsearch_url, bool do_logs, std::string logs_index); - std::vector createBulk(std::string type, std::string data, std::string id, bool onlycreate); + class ES { + public: + CURL *curl; + std::vector bulk_lines; + std::string elasticsearch_url; + std::string index_prefix; + std::string auth; + std::string endpoint; + std::string query; + }; + class CurlRequest { + public: + CURL *handler; + std::string url; + std::string type; + std::string auth; + std::string query; + }; + + bool SendBulk(ES& es); + const std::vector createBulk(const fc::mutable_variant_object& bulk_header, const std::string& data); + bool checkES(ES& es); + const std::string simpleQuery(ES& es); + bool deleteAll(ES& es); + bool handleBulkResponse(long http_code, const std::string& CurlReadBuffer); + const std::string getEndPoint(ES& es); + const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix); + const std::string doCurl(CurlRequest& curl); + const std::string joinBulkLines(const std::vector& bulk); + long getResponseCode(CURL *handler); } } // end namespace graphene::utilities diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 44af778b..e57e3374 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,38 +8,38 @@ endif() file(GLOB UNIT_TESTS "tests/*.cpp") add_executable( chain_test ${UNIT_TESTS} ${COMMON_SOURCES} ) -target_link_libraries( chain_test graphene_chain graphene_app graphene_account_history graphene_bookie graphene_egenesis_none fc graphene_wallet ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( chain_test graphene_chain graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_bookie graphene_egenesis_none fc graphene_wallet ${PLATFORM_SPECIFIC_LIBS} ) if(MSVC) set_source_files_properties( tests/serialization_tests.cpp PROPERTIES COMPILE_FLAGS "/bigobj" ) endif(MSVC) file(GLOB PERFORMANCE_TESTS "performance/*.cpp") add_executable( performance_test ${PERFORMANCE_TESTS} ${COMMON_SOURCES} ) -target_link_libraries( performance_test graphene_chain graphene_app graphene_account_history graphene_bookie graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( performance_test graphene_chain graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_bookie graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) file(GLOB BENCH_MARKS "benchmarks/*.cpp") add_executable( chain_bench ${BENCH_MARKS} ${COMMON_SOURCES} ) -target_link_libraries( chain_bench graphene_chain graphene_app graphene_account_history graphene_bookie graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( chain_bench graphene_chain graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_bookie graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) file(GLOB APP_SOURCES "app/*.cpp") add_executable( app_test ${APP_SOURCES} ) -target_link_libraries( app_test graphene_app graphene_account_history graphene_bookie graphene_net graphene_chain graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( app_test graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_witness graphene_bookie graphene_net graphene_chain graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) file(GLOB INTENSE_SOURCES "intense/*.cpp") add_executable( intense_test ${INTENSE_SOURCES} ${COMMON_SOURCES} ) -target_link_libraries( intense_test graphene_chain graphene_app graphene_account_history graphene_bookie graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( intense_test graphene_chain graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_bookie graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) file(GLOB BETTING_TESTS "betting/*.cpp") add_executable( betting_test ${BETTING_TESTS} ${COMMON_SOURCES} ) -target_link_libraries( betting_test graphene_chain graphene_app graphene_account_history graphene_bookie graphene_egenesis_none fc graphene_wallet ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( betting_test graphene_chain graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_bookie graphene_egenesis_none fc graphene_wallet ${PLATFORM_SPECIFIC_LIBS} ) file(GLOB TOURNAMENT_TESTS "tournament/*.cpp") add_executable( tournament_test ${TOURNAMENT_TESTS} ${COMMON_SOURCES} ) -target_link_libraries( tournament_test graphene_chain graphene_app graphene_account_history graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( tournament_test graphene_chain graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) file(GLOB RANDOM_SOURCES "random/*.cpp") add_executable( random_test ${RANDOM_SOURCES} ${COMMON_SOURCES} ) -target_link_libraries( random_test graphene_chain graphene_app graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( random_test graphene_chain graphene_app graphene_elasticsearch graphene_es_objects graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) file(GLOB CLI_SOURCES "cli/*.cpp") add_executable( cli_test ${CLI_SOURCES} ) @@ -51,4 +51,8 @@ if(MSVC) set_source_files_properties( cli/main.cpp PROPERTIES COMPILE_FLAGS "/bigobj" ) endif(MSVC) +file(GLOB ES_SOURCES "elasticsearch/*.cpp") +add_executable( es_test ${ES_SOURCES} ${COMMON_SOURCES} ) +target_link_libraries( es_test graphene_chain graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} ) + add_subdirectory( generate_empty_blocks ) diff --git a/tests/common/database_fixture.cpp b/tests/common/database_fixture.cpp index 0728ce2d..8cd3dc40 100644 --- a/tests/common/database_fixture.cpp +++ b/tests/common/database_fixture.cpp @@ -29,11 +29,9 @@ #include #include #include +#include +#include -#include - -#include -#include #include #include #include @@ -51,9 +49,7 @@ #include #include -#include #include -#include #include "database_fixture.hpp" @@ -134,8 +130,46 @@ database_fixture::database_fixture() } // app.initialize(); - ahplugin->plugin_set_app(&app); - ahplugin->plugin_initialize(options); + + auto test_name = boost::unit_test::framework::current_test_case().p_name.value; + if(test_name == "elasticsearch_account_history" || test_name == "elasticsearch_suite") { + auto esplugin = app.register_plugin(); + esplugin->plugin_set_app(&app); + + options.insert(std::make_pair("elasticsearch-node-url", boost::program_options::variable_value(string("http://localhost:9200/"), false))); + options.insert(std::make_pair("elasticsearch-bulk-replay", boost::program_options::variable_value(uint32_t(2), false))); + options.insert(std::make_pair("elasticsearch-bulk-sync", boost::program_options::variable_value(uint32_t(2), false))); + options.insert(std::make_pair("elasticsearch-visitor", boost::program_options::variable_value(true, false))); + //options.insert(std::make_pair("elasticsearch-basic-auth", boost::program_options::variable_value(string("elastic:changeme"), false))); + + esplugin->plugin_initialize(options); + esplugin->plugin_startup(); + } + else { + auto ahplugin = app.register_plugin(); + ahplugin->plugin_set_app(&app); + ahplugin->plugin_initialize(options); + ahplugin->plugin_startup(); + } + + if(test_name == "elasticsearch_objects" || test_name == "elasticsearch_suite") { + auto esobjects_plugin = app.register_plugin(); + esobjects_plugin->plugin_set_app(&app); + + options.insert(std::make_pair("es-objects-elasticsearch-url", boost::program_options::variable_value(string("http://localhost:9200/"), false))); + options.insert(std::make_pair("es-objects-bulk-replay", boost::program_options::variable_value(uint32_t(2), false))); + options.insert(std::make_pair("es-objects-bulk-sync", boost::program_options::variable_value(uint32_t(2), false))); + options.insert(std::make_pair("es-objects-proposals", boost::program_options::variable_value(true, false))); + options.insert(std::make_pair("es-objects-accounts", boost::program_options::variable_value(true, false))); + options.insert(std::make_pair("es-objects-assets", boost::program_options::variable_value(true, false))); + options.insert(std::make_pair("es-objects-balances", boost::program_options::variable_value(true, false))); + options.insert(std::make_pair("es-objects-limit-orders", boost::program_options::variable_value(true, false))); + options.insert(std::make_pair("es-objects-asset-bitasset", boost::program_options::variable_value(true, false))); + + esobjects_plugin->plugin_initialize(options); + esobjects_plugin->plugin_startup(); + } + mhplugin->plugin_set_app(&app); mhplugin->plugin_initialize(options); bookieplugin->plugin_set_app(&app); @@ -143,7 +177,6 @@ database_fixture::database_fixture() affiliateplugin->plugin_set_app(&app); affiliateplugin->plugin_initialize(options); - ahplugin->plugin_startup(); mhplugin->plugin_startup(); bookieplugin->plugin_startup(); affiliateplugin->plugin_startup(); diff --git a/tests/elasticsearch/main.cpp b/tests/elasticsearch/main.cpp new file mode 100644 index 00000000..18674a3b --- /dev/null +++ b/tests/elasticsearch/main.cpp @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2018 oxarbitrage and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include +#include + +#include + +#include "../common/database_fixture.hpp" + +#define BOOST_TEST_MODULE Elastic Search Database Tests +#include + +using namespace graphene::chain; +using namespace graphene::chain::test; +using namespace graphene::app; + +BOOST_FIXTURE_TEST_SUITE( elasticsearch_tests, database_fixture ) + +BOOST_AUTO_TEST_CASE(elasticsearch_account_history) { + try { + + CURL *curl; // curl handler + curl = curl_easy_init(); + + graphene::utilities::ES es; + es.curl = curl; + es.elasticsearch_url = "http://localhost:9200/"; + es.index_prefix = "bitshares-"; + //es.auth = "elastic:changeme"; + + // delete all first + auto delete_account_history = graphene::utilities::deleteAll(es); + fc::usleep(fc::milliseconds(1000)); // this is because index.refresh_interval, nothing to worry + + if(delete_account_history) { // all records deleted + + //account_id_type() do 3 ops + create_bitasset("USD", account_id_type()); + auto dan = create_account("dan"); + auto bob = create_account("bob"); + + generate_block(); + fc::usleep(fc::milliseconds(1000)); + + // for later use + //int asset_create_op_id = operation::tag::value; + //int account_create_op_id = operation::tag::value; + + string query = "{ \"query\" : { \"bool\" : { \"must\" : [{\"match_all\": {}}] } } }"; + es.endpoint = es.index_prefix + "*/data/_count"; + es.query = query; + + auto res = graphene::utilities::simpleQuery(es); + variant j = fc::json::from_string(res); + auto total = j["count"].as_string(); + BOOST_CHECK_EQUAL(total, "5"); + + es.endpoint = es.index_prefix + "*/data/_search"; + res = graphene::utilities::simpleQuery(es); + j = fc::json::from_string(res); + auto first_id = j["hits"]["hits"][size_t(0)]["_id"].as_string(); + BOOST_CHECK_EQUAL(first_id, "2.9.0"); + + generate_block(); + auto willie = create_account("willie"); + generate_block(); + + fc::usleep(fc::milliseconds(1000)); // index.refresh_interval + + es.endpoint = es.index_prefix + "*/data/_count"; + res = graphene::utilities::simpleQuery(es); + j = fc::json::from_string(res); + + total = j["count"].as_string(); + BOOST_CHECK_EQUAL(total, "7"); + + // do some transfers in 1 block + transfer(account_id_type()(db), bob, asset(100)); + transfer(account_id_type()(db), bob, asset(200)); + transfer(account_id_type()(db), bob, asset(300)); + + generate_block(); + fc::usleep(fc::milliseconds(1000)); // index.refresh_interval + + res = graphene::utilities::simpleQuery(es); + j = fc::json::from_string(res); + + total = j["count"].as_string(); + BOOST_CHECK_EQUAL(total, "13"); + + // check the visitor data + auto block_date = db.head_block_time(); + std::string index_name = graphene::utilities::generateIndexName(block_date, "bitshares-"); + + es.endpoint = index_name + "/data/2.9.12"; // we know last op is a transfer of amount 300 + res = graphene::utilities::getEndPoint(es); + j = fc::json::from_string(res); + auto last_transfer_amount = j["_source"]["additional_data"]["transfer_data"]["amount"].as_string(); + BOOST_CHECK_EQUAL(last_transfer_amount, "300"); + } + } + catch (fc::exception &e) { + edump((e.to_detail_string())); + throw; + } +} + +BOOST_AUTO_TEST_CASE(elasticsearch_objects) { + try { + + CURL *curl; // curl handler + curl = curl_easy_init(); + + graphene::utilities::ES es; + es.curl = curl; + es.elasticsearch_url = "http://localhost:9200/"; + es.index_prefix = "objects-"; + //es.auth = "elastic:changeme"; + + // delete all first + auto delete_objects = graphene::utilities::deleteAll(es); + + generate_block(); + fc::usleep(fc::milliseconds(1000)); + + if(delete_objects) { // all records deleted + + // asset and bitasset + create_bitasset("USD", account_id_type()); + generate_block(); + fc::usleep(fc::milliseconds(1000)); + + string query = "{ \"query\" : { \"bool\" : { \"must\" : [{\"match_all\": {}}] } } }"; + es.endpoint = es.index_prefix + "*/data/_count"; + es.query = query; + + auto res = graphene::utilities::simpleQuery(es); + variant j = fc::json::from_string(res); + auto total = j["count"].as_string(); + BOOST_CHECK_EQUAL(total, "2"); + + es.endpoint = es.index_prefix + "asset/data/_search"; + res = graphene::utilities::simpleQuery(es); + j = fc::json::from_string(res); + auto first_id = j["hits"]["hits"][size_t(0)]["_source"]["symbol"].as_string(); + BOOST_CHECK_EQUAL(first_id, "USD"); + + auto bitasset_data_id = j["hits"]["hits"][size_t(0)]["_source"]["bitasset_data_id"].as_string(); + es.endpoint = es.index_prefix + "bitasset/data/_search"; + es.query = "{ \"query\" : { \"bool\": { \"must\" : [{ \"term\": { \"object_id\": \""+bitasset_data_id+"\"}}] } } }"; + res = graphene::utilities::simpleQuery(es); + j = fc::json::from_string(res); + auto bitasset_object_id = j["hits"]["hits"][size_t(0)]["_source"]["object_id"].as_string(); + BOOST_CHECK_EQUAL(bitasset_object_id, bitasset_data_id); + } + } + catch (fc::exception &e) { + edump((e.to_detail_string())); + throw; + } +} + +BOOST_AUTO_TEST_CASE(elasticsearch_suite) { + try { + + CURL *curl; // curl handler + curl = curl_easy_init(); + + graphene::utilities::ES es; + es.curl = curl; + es.elasticsearch_url = "http://localhost:9200/"; + es.index_prefix = "bitshares-"; + auto delete_account_history = graphene::utilities::deleteAll(es); + fc::usleep(fc::milliseconds(1000)); + es.index_prefix = "objects-"; + auto delete_objects = graphene::utilities::deleteAll(es); + fc::usleep(fc::milliseconds(1000)); + + if(delete_account_history && delete_objects) { // all records deleted + + + } + } + catch (fc::exception &e) { + edump((e.to_detail_string())); + throw; + } +} + +BOOST_AUTO_TEST_SUITE_END()