diff --git a/CMakeLists.txt b/CMakeLists.txt index 4df202b3..bfe1f71c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,34 @@ endif() list( APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMakeModules" ) +# function to help with cUrl +macro(FIND_CURL) + if (NOT WIN32 AND NOT APPLE AND CURL_STATICLIB) + find_package(OpenSSL REQUIRED) + set (OLD_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) + set (CMAKE_FIND_LIBRARY_SUFFIXES .a) + find_package(CURL REQUIRED) + list(APPEND CURL_LIBRARIES ${OPENSSL_LIBRARIES} ${BOOST_THREAD_LIBRARY} ${CMAKE_DL_LIBS}) + set (CMAKE_FIND_LIBRARY_SUFFIXES ${OLD_SUFFIXES}) + else (NOT WIN32 AND NOT APPLE AND CURL_STATICLIB) + find_package(CURL REQUIRED) + endif (NOT WIN32 AND NOT APPLE AND CURL_STATICLIB) + + if( WIN32 ) + if ( MSVC ) + list( APPEND CURL_LIBRARIES Wldap32 ) + endif( MSVC ) + + if( MINGW ) + # MinGW requires a specific order of included libraries ( CURL before ZLib ) + find_package( ZLIB REQUIRED ) + list( APPEND CURL_LIBRARIES ${ZLIB_LIBRARY} pthread ) + endif( MINGW ) + + list( APPEND CURL_LIBRARIES ${PLATFORM_SPECIFIC_LIBS} ) + endif( WIN32 ) +endmacro() + set(CMAKE_EXPORT_COMPILE_COMMANDS "ON") set(GRAPHENE_EGENESIS_JSON "${CMAKE_CURRENT_SOURCE_DIR}/genesis.json" CACHE PATH "location of the genesis.json to embed in the executable" ) diff --git a/libraries/plugins/elasticsearch/CMakeLists.txt b/libraries/plugins/elasticsearch/CMakeLists.txt index 88e584b7..928d2e3c 100644 --- a/libraries/plugins/elasticsearch/CMakeLists.txt +++ b/libraries/plugins/elasticsearch/CMakeLists.txt @@ -4,13 +4,21 @@ add_library( graphene_elasticsearch elasticsearch_plugin.cpp ) -target_link_libraries( graphene_elasticsearch PRIVATE graphene_plugin curl ) -target_include_directories( graphene_elasticsearch - PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) +find_curl() +include_directories(${CURL_INCLUDE_DIRS}) if(MSVC) set_source_files_properties(elasticsearch_plugin.cpp PROPERTIES COMPILE_FLAGS "/bigobj" ) endif(MSVC) +if(CURL_STATICLIB) + SET_TARGET_PROPERTIES(graphene_elasticsearch PROPERTIES + COMPILE_DEFINITIONS "CURL_STATICLIB") +endif(CURL_STATICLIB) +target_link_libraries( graphene_elasticsearch PRIVATE graphene_plugin ${CURL_LIBRARIES} ) +target_include_directories( graphene_elasticsearch + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" + PUBLIC "${CURL_INCLUDE_DIR}" ) + install( TARGETS graphene_elasticsearch diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index dc167b05..8777c06d 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -57,9 +57,9 @@ class elasticsearch_plugin_impl bool _elasticsearch_visitor = false; std::string _elasticsearch_basic_auth = ""; std::string _elasticsearch_index_prefix = "peerplays-"; - bool _elasticsearch_operation_object = false; + bool _elasticsearch_operation_object = true; uint32_t _elasticsearch_start_es_after_block = 0; - bool _elasticsearch_operation_string = true; + bool _elasticsearch_operation_string = false; mode _elasticsearch_mode = mode::only_save; CURL *curl; // curl handler vector bulk_lines; // vector of op lines @@ -75,20 +75,19 @@ class elasticsearch_plugin_impl std::string bulk_line; std::string index_name; bool is_sync = false; - fc::time_point last_sync; private: bool add_elasticsearch( const account_id_type account_id, const optional& oho, const uint32_t block_number ); const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj, - const account_id_type account_id, + const account_id_type& account_id, const optional & oho); - const account_statistics_object& getStatsObject(const account_id_type account_id); + const account_statistics_object& getStatsObject(const account_id_type& account_id); void growStats(const account_statistics_object& stats_obj, const account_transaction_history_object& ath); void getOperationType(const optional & oho); void doOperationHistory(const optional & oho); - void doBlock(const optional & oho, const signed_block& b); + void doBlock(uint32_t trx_in_block, const signed_block& b); void doVisitor(const optional & oho); void checkState(const fc::time_point_sec& block_time); - void cleanObjects(const account_transaction_history_object& ath, account_id_type account_id); + void cleanObjects(const account_transaction_history_id_type& ath, const account_id_type& account_id); void createBulkLine(const account_transaction_history_object& ath); void prepareBulk(const account_transaction_history_id_type& ath_id); void populateESstruct(); @@ -148,7 +147,7 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b // populate what we can before impacted loop getOperationType(oho); doOperationHistory(oho); - doBlock(oho, b); + doBlock(oho->trx_in_block, b); if(_elasticsearch_visitor) doVisitor(oho); @@ -174,7 +173,11 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b for( auto& account_id : impacted ) { if(!add_elasticsearch( account_id, oho, b.block_num() )) + { + elog( "Error adding data to Elastic Search: block num ${b}, account ${a}, data ${d}", + ("b",b.block_num()) ("a",account_id) ("d", oho) ); return false; + } } } // we send bulk at end of block when we are in sync for better real time client experience @@ -185,23 +188,33 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b { prepare.clear(); if(!graphene::utilities::SendBulk(es)) + { + // Note: although called with `std::move()`, `es` is not updated in `SendBulk()` + elog( "Error sending ${n} lines of bulk data to Elastic Search, the first lines are:", + ("n",es.bulk_lines.size()) ); + for( size_t i = 0; i < es.bulk_lines.size() && i < 10; ++i ) + { + edump( (es.bulk_lines[i]) ); + } return false; + } else bulk_lines.clear(); } } + if(bulk_lines.size() != limit_documents) + bulk_lines.reserve(limit_documents); + return true; } void elasticsearch_plugin_impl::checkState(const fc::time_point_sec& block_time) { - fc::time_point current_time(fc::time_point::now()); - if(((current_time - block_time) < fc::seconds(30)) || (current_time - last_sync > fc::seconds(60))) + if((fc::time_point::now() - block_time) < fc::seconds(30)) { limit_documents = _elasticsearch_bulk_sync; is_sync = true; - last_sync = current_time; } else { @@ -232,11 +245,11 @@ void elasticsearch_plugin_impl::doOperationHistory(const optional op); } -void elasticsearch_plugin_impl::doBlock(const optional & oho, const signed_block& b) +void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b) { std::string trx_id = ""; - if(oho->trx_in_block < b.transactions.size()) - trx_id = b.transactions[oho->trx_in_block].id().str(); + if(trx_in_block < b.transactions.size()) + trx_id = b.transactions[trx_in_block].id().str(); bs.block_num = b.block_num(); bs.block_time = b.timestamp; bs.trx_id = trx_id; @@ -244,23 +257,41 @@ void elasticsearch_plugin_impl::doBlock(const optional & oho) { + graphene::chain::database& db = database(); + operation_visitor o_v; oho->op.visit(o_v); + auto fee_asset = o_v.fee_asset(db); vs.fee_data.asset = o_v.fee_asset; + vs.fee_data.asset_name = fee_asset.symbol; vs.fee_data.amount = o_v.fee_amount; + vs.fee_data.amount_units = (o_v.fee_amount.value)/(double)asset::scaled_precision(fee_asset.precision).value; + auto transfer_asset = o_v.transfer_asset_id(db); vs.transfer_data.asset = o_v.transfer_asset_id; + vs.transfer_data.asset_name = transfer_asset.symbol; vs.transfer_data.amount = o_v.transfer_amount; + vs.transfer_data.amount_units = (o_v.transfer_amount.value)/(double)asset::scaled_precision(transfer_asset.precision).value; vs.transfer_data.from = o_v.transfer_from; vs.transfer_data.to = o_v.transfer_to; + auto fill_pays_asset = o_v.fill_pays_asset_id(db); + auto fill_receives_asset = o_v.fill_receives_asset_id(db); vs.fill_data.order_id = o_v.fill_order_id; vs.fill_data.account_id = o_v.fill_account_id; vs.fill_data.pays_asset_id = o_v.fill_pays_asset_id; + vs.fill_data.pays_asset_name = fill_pays_asset.symbol; vs.fill_data.pays_amount = o_v.fill_pays_amount; + vs.fill_data.pays_amount_units = (o_v.fill_pays_amount.value)/(double)asset::scaled_precision(fill_pays_asset.precision).value; vs.fill_data.receives_asset_id = o_v.fill_receives_asset_id; + vs.fill_data.receives_asset_name = fill_receives_asset.symbol; vs.fill_data.receives_amount = o_v.fill_receives_amount; + vs.fill_data.receives_amount_units = (o_v.fill_receives_amount.value)/(double)asset::scaled_precision(fill_receives_asset.precision).value; + + auto fill_price = (o_v.fill_receives_amount.value/(double)asset::scaled_precision(fill_receives_asset.precision).value) / + (o_v.fill_pays_amount.value/(double)asset::scaled_precision(fill_pays_asset.precision).value); + vs.fill_data.fill_price_units = fill_price; //vs.fill_data.fill_price = o_v.fill_fill_price; //vs.fill_data.is_maker = o_v.fill_is_maker; } @@ -276,13 +307,22 @@ bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account createBulkLine(ath); prepareBulk(ath.id); } - cleanObjects(ath, account_id); + cleanObjects(ath.id, account_id); if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech prepare.clear(); populateESstruct(); if(!graphene::utilities::SendBulk(es)) + { + // Note: although called with `std::move()`, `es` is not updated in `SendBulk()` + elog( "Error sending ${n} lines of bulk data to Elastic Search, the first lines are:", + ("n",es.bulk_lines.size()) ); + for( size_t i = 0; i < es.bulk_lines.size() && i < 10; ++i ) + { + edump( (es.bulk_lines[i]) ); + } return false; + } else bulk_lines.clear(); } @@ -290,15 +330,16 @@ bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account return true; } -const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const account_id_type account_id) +const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const account_id_type& account_id) { graphene::chain::database& db = database(); - const auto &acct = db.get(account_id); - return acct.statistics(db); + const auto &stats_obj = db.get_account_stats_by_owner(account_id); + + return stats_obj; } const account_transaction_history_object& elasticsearch_plugin_impl::addNewEntry(const account_statistics_object& stats_obj, - const account_id_type account_id, + const account_id_type& account_id, const optional & oho) { graphene::chain::database& db = database(); @@ -340,19 +381,21 @@ void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id fc::mutable_variant_object bulk_header; bulk_header["_index"] = index_name; bulk_header["_type"] = "data"; - bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "." + ath_id.instance; - prepare = graphene::utilities::createBulk(bulk_header, bulk_line); - bulk_lines.insert(bulk_lines.end(), prepare.begin(), prepare.end()); + bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "." + + fc::to_string(ath_id.instance.value); + prepare = graphene::utilities::createBulk(bulk_header, std::move(bulk_line)); + std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines)); + prepare.clear(); } -void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_object& ath, account_id_type account_id) +void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_id_type& ath_id, const account_id_type& account_id) { graphene::chain::database& db = database(); // remove everything except current object from ath const auto &his_idx = db.get_index_type(); const auto &by_seq_idx = his_idx.indices().get(); auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0)); - if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath.id) { + if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath_id) { // if found, remove the entry const auto remove_op_id = itr->operation_id; const auto itr_remove = itr; @@ -377,9 +420,12 @@ void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_o void elasticsearch_plugin_impl::populateESstruct() { es.curl = curl; - es.bulk_lines = bulk_lines; + es.bulk_lines = std::move(bulk_lines); es.elasticsearch_url = _elasticsearch_node_url; es.auth = _elasticsearch_basic_auth; + es.index_prefix = _elasticsearch_index_prefix; + es.endpoint = ""; + es.query = ""; } } // end namespace detail @@ -421,11 +467,11 @@ void elasticsearch_plugin::plugin_set_program_options( ("elasticsearch-index-prefix", boost::program_options::value(), "Add a prefix to the index(peerplays-)") ("elasticsearch-operation-object", boost::program_options::value(), - "Save operation as object(false)") + "Save operation as object(true)") ("elasticsearch-start-es-after-block", boost::program_options::value(), "Start doing ES job after block(0)") ("elasticsearch-operation-string", boost::program_options::value(), - "Save operation as string. Needed to serve history api calls(true)") + "Save operation as string. Needed to serve history api calls(false)") ("elasticsearch-mode", boost::program_options::value(), "Mode of operation: only_save(0), only_query(1), all(2) - Default: 0") ; @@ -467,18 +513,18 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia if (options.count("elasticsearch-mode")) { const auto option_number = options["elasticsearch-mode"].as(); if(option_number > mode::all) - FC_THROW_EXCEPTION(fc::exception, "Elasticsearch mode not valid"); + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid"); my->_elasticsearch_mode = static_cast(options["elasticsearch-mode"].as()); } if(my->_elasticsearch_mode != mode::only_query) { if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string) - FC_THROW_EXCEPTION(fc::exception, + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true"); database().applied_block.connect([this](const signed_block &b) { if (!my->update_account_histories(b)) - FC_THROW_EXCEPTION(fc::exception, + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying."); }); } @@ -563,13 +609,12 @@ vector elasticsearch_plugin::get_account_history( const auto response = graphene::utilities::simpleQuery(es); variant variant_response = fc::json::from_string(response); - const auto hits = variant_response["hits"]["total"]["value"]; + const auto hits = variant_response["hits"]["total"]; uint32_t size; if( hits.is_object() ) // ES-7 ? size = static_cast(hits["value"].as_uint64()); else // probably ES-6 size = static_cast(hits.as_uint64()); - size = std::min( size, limit ); for(unsigned i=0; i(), "Elasticsearch node url(http://localhost:9200/)") + ("es-objects-elasticsearch-url", boost::program_options::value(), + "Elasticsearch node url(http://localhost:9200/)") ("es-objects-auth", boost::program_options::value(), "Basic auth username:password('')") - ("es-objects-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(10000)") - ("es-objects-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a synchronized chain(100)") + ("es-objects-bulk-replay", boost::program_options::value(), + "Number of bulk documents to index on replay(10000)") + ("es-objects-bulk-sync", boost::program_options::value(), + "Number of bulk documents to index on a synchronized chain(100)") ("es-objects-proposals", boost::program_options::value(), "Store proposal objects(true)") ("es-objects-accounts", boost::program_options::value(), "Store account objects(true)") ("es-objects-assets", boost::program_options::value(), "Store asset objects(true)") ("es-objects-balances", boost::program_options::value(), "Store balances objects(true)") - ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects(true)") + ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects(false)") ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data(true)") - ("es-objects-index-prefix", boost::program_options::value(), "Add a prefix to the index(ppobjects-)") - ("es-objects-keep-only-current", boost::program_options::value(), "Keep only current state of the objects(true)") - ("es-objects-start-es-after-block", boost::program_options::value(), "Start doing ES job after block(0)") + ("es-objects-index-prefix", boost::program_options::value(), + "Add a prefix to the index(ppobjects-)") + ("es-objects-keep-only-current", boost::program_options::value(), + "Keep only current state of the objects(true)") + ("es-objects-start-es-after-block", boost::program_options::value(), + "Start doing ES job after block(0)") ; cfg.add(cli); } void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options) { - database().applied_block.connect([this](const signed_block &b) { - if(b.block_num() == 1) { - if (!my->genesis()) - FC_THROW_EXCEPTION(fc::exception, "Error populating genesis data."); - } - }); - - database().new_objects.connect([this]( const vector& ids, const flat_set& impacted_accounts ) { - if(!my->index_database(ids, "create")) - { - FC_THROW_EXCEPTION(fc::exception, "Error creating object from ES database, we are going to keep trying."); - } - }); - database().changed_objects.connect([this]( const vector& ids, const flat_set& impacted_accounts ) { - if(!my->index_database(ids, "update")) - { - FC_THROW_EXCEPTION(fc::exception, "Error updating object from ES database, we are going to keep trying."); - } - }); - database().removed_objects.connect([this](const vector& ids, const vector& objs, const flat_set& impacted_accounts) { - if(!my->index_database(ids, "delete")) - { - FC_THROW_EXCEPTION(fc::exception, "Error deleting object from ES database, we are going to keep trying."); - } - }); - - if (options.count("es-objects-elasticsearch-url")) { my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); } @@ -381,6 +359,37 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable if (options.count("es-objects-start-es-after-block")) { my->_es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as(); } + + database().applied_block.connect([this](const signed_block &b) { + if(b.block_num() == 1 && my->_es_objects_start_es_after_block == 0) { + if (!my->genesis()) + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating genesis data."); + } + }); + database().new_objects.connect([this]( const vector& ids, + const flat_set& impacted_accounts ) { + if(!my->index_database(ids, "create")) + { + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, + "Error creating object from ES database, we are going to keep trying."); + } + }); + database().changed_objects.connect([this]( const vector& ids, + const flat_set& impacted_accounts ) { + if(!my->index_database(ids, "update")) + { + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, + "Error updating object from ES database, we are going to keep trying."); + } + }); + database().removed_objects.connect([this](const vector& ids, + const vector& objs, const flat_set& impacted_accounts) { + if(!my->index_database(ids, "delete")) + { + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, + "Error deleting object from ES database, we are going to keep trying."); + } + }); } void es_objects_plugin::plugin_startup() @@ -396,4 +405,4 @@ void es_objects_plugin::plugin_startup() ilog("elasticsearch OBJECTS: plugin_startup() begin"); } -} } +} } \ No newline at end of file