diff --git a/libraries/plugins/CMakeLists.txt b/libraries/plugins/CMakeLists.txt index b3fe52d2..58728a9e 100644 --- a/libraries/plugins/CMakeLists.txt +++ b/libraries/plugins/CMakeLists.txt @@ -10,3 +10,4 @@ add_subdirectory( generate_genesis ) add_subdirectory( generate_uia_sharedrop_genesis ) add_subdirectory( debug_witness ) add_subdirectory( snapshot ) +add_subdirectory( es_objects ) diff --git a/libraries/plugins/es_objects/CMakeLists.txt b/libraries/plugins/es_objects/CMakeLists.txt new file mode 100644 index 00000000..92e3d150 --- /dev/null +++ b/libraries/plugins/es_objects/CMakeLists.txt @@ -0,0 +1,23 @@ +file(GLOB HEADERS "include/graphene/es_objects/*.hpp") + +add_library( graphene_es_objects + es_objects.cpp + ) + +target_link_libraries( graphene_es_objects graphene_chain graphene_app curl ) +target_include_directories( graphene_es_objects + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) + +if(MSVC) + set_source_files_properties(es_objects.cpp PROPERTIES COMPILE_FLAGS "/bigobj" ) +endif(MSVC) + +install( TARGETS + graphene_es_objects + + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib +) +INSTALL( FILES ${HEADERS} DESTINATION "include/graphene/es_objects" ) + diff --git a/libraries/plugins/es_objects/es_objects.cpp b/libraries/plugins/es_objects/es_objects.cpp new file mode 100644 index 00000000..7c9c2b61 --- /dev/null +++ b/libraries/plugins/es_objects/es_objects.cpp @@ -0,0 +1,355 @@ +/* + * Copyright (c) 2018 oxarbitrage, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include + +#include + +#include +#include +#include +#include + +#include + + +namespace graphene { namespace es_objects { + +namespace detail +{ + +class es_objects_plugin_impl +{ + public: + es_objects_plugin_impl(es_objects_plugin& _plugin) + : _self( _plugin ) + { curl = curl_easy_init(); } + virtual ~es_objects_plugin_impl(); + + void updateDatabase( const vector& ids , bool isNew); + + es_objects_plugin& _self; + std::string _es_objects_elasticsearch_url = "http://localhost:9200/"; + uint32_t _es_objects_bulk_replay = 5000; + uint32_t _es_objects_bulk_sync = 10; + bool _es_objects_proposals = true; + bool _es_objects_accounts = true; + bool _es_objects_assets = true; + bool _es_objects_balances = true; + bool _es_objects_limit_orders = true; + bool _es_objects_asset_bitasset = true; + bool _es_objects_logs = true; + CURL *curl; // curl handler + vector bulk; + vector prepare; + map bitassets; + //uint32_t bitasset_seq; + private: + void PrepareProposal(const proposal_object* proposal_object, const fc::time_point_sec block_time, uint32_t block_number); + void PrepareAccount(const account_object* account_object, const fc::time_point_sec block_time, uint32_t block_number); + void PrepareAsset(const asset_object* asset_object, const fc::time_point_sec block_time, uint32_t block_number); + void PrepareBalance(const balance_object* balance_object, const fc::time_point_sec block_time, uint32_t block_number); + void PrepareLimit(const limit_order_object* limit_object, const fc::time_point_sec block_time, uint32_t block_number); + void PrepareBitAsset(const asset_bitasset_data_object* bitasset_object, const fc::time_point_sec block_time, uint32_t block_number); +}; + +void es_objects_plugin_impl::updateDatabase( const vector& ids , bool isNew) +{ + + graphene::chain::database &db = _self.database(); + + const fc::time_point_sec block_time = db.head_block_time(); + const uint32_t block_number = db.head_block_num(); + + // check if we are in replay or in sync and change number of bulk documents accordingly + uint32_t limit_documents = 0; + if((fc::time_point::now() - block_time) < fc::seconds(30)) + limit_documents = _es_objects_bulk_sync; + else + limit_documents = _es_objects_bulk_replay; + + if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech + if(!graphene::utilities::SendBulk(curl, bulk, _es_objects_elasticsearch_url, _es_objects_logs, "objects_logs")) + elog("Error sending data to database"); + bulk.clear(); + } + + for(auto const& value: ids) { + if(value.is() && _es_objects_proposals) { + auto obj = db.find_object(value); + auto p = static_cast(obj); + if(p != nullptr) + PrepareProposal(p, block_time, block_number); + } + else if(value.is() && _es_objects_accounts) { + auto obj = db.find_object(value); + auto a = static_cast(obj); + if(a != nullptr) + PrepareAccount(a, block_time, block_number); + } + else if(value.is() && _es_objects_assets) { + auto obj = db.find_object(value); + auto a = static_cast(obj); + if(a != nullptr) + PrepareAsset(a, block_time, block_number); + } + else if(value.is() && _es_objects_balances) { + auto obj = db.find_object(value); + auto b = static_cast(obj); + if(b != nullptr) + PrepareBalance(b, block_time, block_number); + } + else if(value.is() && _es_objects_limit_orders) { + auto obj = db.find_object(value); + auto l = static_cast(obj); + if(l != nullptr) + PrepareLimit(l, block_time, block_number); + } + else if(value.is() && _es_objects_asset_bitasset) { + auto obj = db.find_object(value); + auto ba = static_cast(obj); + if(ba != nullptr) + PrepareBitAsset(ba, block_time, block_number); + } + } +} + +void es_objects_plugin_impl::PrepareProposal(const proposal_object* proposal_object, const fc::time_point_sec block_time, uint32_t block_number) +{ + proposal_struct prop; + prop.object_id = proposal_object->id; + prop.block_time = block_time; + prop.block_number = block_number; + prop.expiration_time = proposal_object->expiration_time; + prop.review_period_time = proposal_object->review_period_time; + prop.proposed_transaction = fc::json::to_string(proposal_object->proposed_transaction); + prop.required_owner_approvals = fc::json::to_string(proposal_object->required_owner_approvals); + prop.available_owner_approvals = fc::json::to_string(proposal_object->available_owner_approvals); + prop.required_active_approvals = fc::json::to_string(proposal_object->required_active_approvals); + prop.available_key_approvals = fc::json::to_string(proposal_object->available_key_approvals); + prop.proposer = proposal_object->proposer; + + std::string data = fc::json::to_string(prop); + prepare = graphene::utilities::createBulk("bitshares-proposal", data, "", 1); + bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare.clear(); +} + +void es_objects_plugin_impl::PrepareAccount(const account_object* account_object, const fc::time_point_sec block_time, uint32_t block_number) +{ + account_struct acct; + acct.object_id = account_object->id; + acct.block_time = block_time; + acct.block_number = block_number; + acct.membership_expiration_date = account_object->membership_expiration_date; + acct.registrar = account_object->registrar; + acct.referrer = account_object->referrer; + acct.lifetime_referrer = account_object->lifetime_referrer; + acct.network_fee_percentage = account_object->network_fee_percentage; + acct.lifetime_referrer_fee_percentage = account_object->lifetime_referrer_fee_percentage; + acct.referrer_rewards_percentage = account_object->referrer_rewards_percentage; + acct.name = account_object->name; + acct.owner_account_auths = fc::json::to_string(account_object->owner.account_auths); + acct.owner_key_auths = fc::json::to_string(account_object->owner.key_auths); + acct.owner_address_auths = fc::json::to_string(account_object->owner.address_auths); + acct.active_account_auths = fc::json::to_string(account_object->active.account_auths); + acct.active_key_auths = fc::json::to_string(account_object->active.key_auths); + acct.active_address_auths = fc::json::to_string(account_object->active.address_auths); + acct.voting_account = account_object->options.voting_account; + + std::string data = fc::json::to_string(acct); + prepare = graphene::utilities::createBulk("bitshares-account", data, "", 1); + bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare.clear(); +} + +void es_objects_plugin_impl::PrepareAsset(const asset_object* asset_object, const fc::time_point_sec block_time, uint32_t block_number) +{ + asset_struct _asset; + _asset.object_id = asset_object->id; + _asset.block_time = block_time; + _asset.block_number = block_number; + _asset.symbol = asset_object->symbol; + _asset.issuer = asset_object->issuer; + _asset.is_market_issued = asset_object->is_market_issued(); + _asset.dynamic_asset_data_id = asset_object->dynamic_asset_data_id; + _asset.bitasset_data_id = asset_object->bitasset_data_id; + + std::string data = fc::json::to_string(_asset); + prepare = graphene::utilities::createBulk("bitshares-asset", data, fc::json::to_string(asset_object->id), 0); + bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare.clear(); +} + +void es_objects_plugin_impl::PrepareBalance(const balance_object* balance_object, const fc::time_point_sec block_time, uint32_t block_number) +{ + balance_struct balance; + balance.object_id = balance_object->id; + balance.block_time = block_time; + balance.block_number = block_number;balance.owner = balance_object->owner; + balance.asset_id = balance_object->balance.asset_id; + balance.amount = balance_object->balance.amount; + + std::string data = fc::json::to_string(balance); + prepare = graphene::utilities::createBulk("bitshares-balance", data, "", 1); + bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare.clear(); +} + +void es_objects_plugin_impl::PrepareLimit(const limit_order_object* limit_object, const fc::time_point_sec block_time, uint32_t block_number) +{ + limit_order_struct limit; + limit.object_id = limit_object->id; + limit.block_time = block_time; + limit.block_number = block_number; + limit.expiration = limit_object->expiration; + limit.seller = limit_object->seller; + limit.for_sale = limit_object->for_sale; + limit.sell_price = limit_object->sell_price; + limit.deferred_fee = limit_object->deferred_fee; + + std::string data = fc::json::to_string(limit); + prepare = graphene::utilities::createBulk("bitshares-limitorder", data, "", 1); + bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare.clear(); +} + +void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object* bitasset_object, const fc::time_point_sec block_time, uint32_t block_number) +{ + if(!bitasset_object->is_prediction_market) { + + auto object_id = bitasset_object->id; + auto it = bitassets.find(object_id); + if(it == bitassets.end()) + bitassets[object_id] = fc::json::to_string(bitasset_object->current_feed); + else { + if(it->second == fc::json::to_string(bitasset_object->current_feed)) return; + else bitassets[object_id] = fc::json::to_string(bitasset_object->current_feed); + } + + bitasset_struct bitasset; + + bitasset.object_id = bitasset_object->id; + bitasset.block_time = block_time; + bitasset.block_number = block_number; + bitasset.current_feed = fc::json::to_string(bitasset_object->current_feed); + bitasset.current_feed_publication_time = bitasset_object->current_feed_publication_time; + + std::string data = fc::json::to_string(bitasset); + prepare = graphene::utilities::createBulk("bitshares-bitasset", data, "", 1); + bulk.insert(bulk.end(), prepare.begin(), prepare.end()); + prepare.clear(); + } +} + +es_objects_plugin_impl::~es_objects_plugin_impl() +{ + return; +} + + +} // end namespace detail + +es_objects_plugin::es_objects_plugin() : + my( new detail::es_objects_plugin_impl(*this) ) +{ +} + +es_objects_plugin::~es_objects_plugin() +{ +} + +std::string es_objects_plugin::plugin_name()const +{ + return "es_objects"; +} +std::string es_objects_plugin::plugin_description()const +{ + return "Stores blockchain objects in ES database. Experimental."; +} + +void es_objects_plugin::plugin_set_program_options( + boost::program_options::options_description& cli, + boost::program_options::options_description& cfg + ) +{ + cli.add_options() + ("es-objects-elasticsearch-url", boost::program_options::value(), "Elasticsearch node url") + ("es-objects-logs", boost::program_options::value(), "Log bulk events to database") + ("es-objects-bulk-replay", boost::program_options::value(), "Number of bulk documents to index on replay(5000)") + ("es-objects-bulk-sync", boost::program_options::value(), "Number of bulk documents to index on a syncronied chain(10)") + ("es-objects-proposals", boost::program_options::value(), "Store proposal objects") + ("es-objects-accounts", boost::program_options::value(), "Store account objects") + ("es-objects-assets", boost::program_options::value(), "Store asset objects") + ("es-objects-balances", boost::program_options::value(), "Store balances objects") + ("es-objects-limit-orders", boost::program_options::value(), "Store limit order objects") + ("es-objects-asset-bitasset", boost::program_options::value(), "Store feed data") + + ; + cfg.add(cli); +} + +void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options) +{ + database().new_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ){ my->updateDatabase(ids, 1); }); + database().changed_objects.connect([&]( const vector& ids, const flat_set& impacted_accounts ){ my->updateDatabase(ids, 0); }); + + if (options.count("es-objects-elasticsearch-url")) { + my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as(); + } + if (options.count("es-objects-logs")) { + my->_es_objects_logs = options["es-objects-logs"].as(); + } + if (options.count("es-objects-bulk-replay")) { + my->_es_objects_bulk_replay = options["es-objects-bulk-replay"].as(); + } + if (options.count("es-objects-bulk-sync")) { + my->_es_objects_bulk_sync = options["es-objects-bulk-sync"].as(); + } + if (options.count("es-objects-proposals")) { + my->_es_objects_proposals = options["es-objects-proposals"].as(); + } + if (options.count("es-objects-accounts")) { + my->_es_objects_accounts = options["es-objects-accounts"].as(); + } + if (options.count("es-objects-assets")) { + my->_es_objects_assets = options["es-objects-assets"].as(); + } + if (options.count("es-objects-balances")) { + my->_es_objects_balances = options["es-objects-balances"].as(); + } + if (options.count("es-objects-limit-orders")) { + my->_es_objects_limit_orders = options["es-objects-limit-orders"].as(); + } + if (options.count("es-objects-asset-bitasset")) { + my->_es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as(); + } +} + +void es_objects_plugin::plugin_startup() +{ + ilog("elasticsearch objects: plugin_startup() begin"); +} + +} } \ No newline at end of file diff --git a/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp new file mode 100644 index 00000000..31809e04 --- /dev/null +++ b/libraries/plugins/es_objects/include/graphene/es_objects/es_objects.hpp @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2018 oxarbitrage, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#pragma once + +#include +#include + +namespace graphene { namespace es_objects { + +using namespace chain; + + +namespace detail +{ + class es_objects_plugin_impl; +} + +class es_objects_plugin : public graphene::app::plugin +{ + public: + es_objects_plugin(); + virtual ~es_objects_plugin(); + + std::string plugin_name()const override; + std::string plugin_description()const override; + virtual void plugin_set_program_options( + boost::program_options::options_description& cli, + boost::program_options::options_description& cfg) override; + virtual void plugin_initialize(const boost::program_options::variables_map& options) override; + virtual void plugin_startup() override; + + friend class detail::es_objects_plugin_impl; + std::unique_ptr my; +}; + +struct proposal_struct { + object_id_type object_id; + fc::time_point_sec block_time; + uint32_t block_number; + time_point_sec expiration_time; + optional review_period_time; + string proposed_transaction; + string required_active_approvals; + string available_active_approvals; + string required_owner_approvals; + string available_owner_approvals; + string available_key_approvals; + account_id_type proposer; + +}; +struct account_struct { + object_id_type object_id; + fc::time_point_sec block_time; + uint32_t block_number; + time_point_sec membership_expiration_date; + account_id_type registrar; + account_id_type referrer; + account_id_type lifetime_referrer; + uint16_t network_fee_percentage; + uint16_t lifetime_referrer_fee_percentage; + uint16_t referrer_rewards_percentage; + string name; + string owner_account_auths; + string owner_key_auths; + string owner_address_auths; + string active_account_auths; + string active_key_auths; + string active_address_auths; + account_id_type voting_account; +}; +struct asset_struct { + object_id_type object_id; + fc::time_point_sec block_time; + uint32_t block_number; + string symbol; + account_id_type issuer; + bool is_market_issued; + asset_dynamic_data_id_type dynamic_asset_data_id; + optional bitasset_data_id; + +}; +struct balance_struct { + object_id_type object_id; + fc::time_point_sec block_time; + uint32_t block_number; + address owner; + asset_id_type asset_id; + share_type amount; +}; +struct limit_order_struct { + object_id_type object_id; + fc::time_point_sec block_time; + uint32_t block_number; + time_point_sec expiration; + account_id_type seller; + share_type for_sale; + price sell_price; + share_type deferred_fee; +}; +struct bitasset_struct { + object_id_type object_id; + fc::time_point_sec block_time; + uint32_t block_number; + string current_feed; + time_point_sec current_feed_publication_time; + time_point_sec feed_expiration_time; +}; + +} } //graphene::es_objects + +FC_REFLECT( graphene::es_objects::proposal_struct, (object_id)(block_time)(block_number)(expiration_time)(review_period_time)(proposed_transaction)(required_active_approvals)(available_active_approvals)(required_owner_approvals)(available_owner_approvals)(available_key_approvals)(proposer) ) +FC_REFLECT( graphene::es_objects::account_struct, (object_id)(block_time)(block_number)(membership_expiration_date)(registrar)(referrer)(lifetime_referrer)(network_fee_percentage)(lifetime_referrer_fee_percentage)(referrer_rewards_percentage)(name)(owner_account_auths)(owner_key_auths)(owner_address_auths)(active_account_auths)(active_key_auths)(active_address_auths)(voting_account) ) +FC_REFLECT( graphene::es_objects::asset_struct, (object_id)(block_time)(block_number)(symbol)(issuer)(is_market_issued)(dynamic_asset_data_id)(bitasset_data_id) ) +FC_REFLECT( graphene::es_objects::balance_struct, (object_id)(block_time)(block_number)(block_time)(owner)(asset_id)(amount) ) +FC_REFLECT( graphene::es_objects::limit_order_struct, (object_id)(block_time)(block_number)(expiration)(seller)(for_sale)(sell_price)(deferred_fee) ) +FC_REFLECT( graphene::es_objects::bitasset_struct, (object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time) ) \ No newline at end of file diff --git a/libraries/utilities/CMakeLists.txt b/libraries/utilities/CMakeLists.txt index f2d646d5..98086b10 100644 --- a/libraries/utilities/CMakeLists.txt +++ b/libraries/utilities/CMakeLists.txt @@ -14,6 +14,7 @@ set(sources string_escape.cpp tempdir.cpp words.cpp + elasticsearch.cpp ${HEADERS}) configure_file("${CMAKE_CURRENT_SOURCE_DIR}/git_revision.cpp.in" "${CMAKE_CURRENT_BINARY_DIR}/git_revision.cpp" @ONLY) diff --git a/libraries/utilities/elasticsearch.cpp b/libraries/utilities/elasticsearch.cpp new file mode 100644 index 00000000..1674a12a --- /dev/null +++ b/libraries/utilities/elasticsearch.cpp @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2018 oxarbitrage, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include + +#include +#include + +namespace graphene { namespace utilities { + +bool SendBulk(CURL *curl, std::vector& bulk, std::string elasticsearch_url, bool do_logs, std::string logs_index) +{ + // curl buffers to read + std::string readBuffer; + std::string readBuffer_logs; + + std::string bulking = ""; + + bulking = boost::algorithm::join(bulk, "\n"); + bulking = bulking + "\n"; + bulk.clear(); + + struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, "Content-Type: application/json"); + std::string url = elasticsearch_url + "_bulk"; + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_POST, true); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, bulking.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&readBuffer); + curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcrp/0.1"); + //curl_easy_setopt(curl, CURLOPT_VERBOSE, true); + curl_easy_perform(curl); + + long http_code = 0; + curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); + if(http_code == 200) { + // all good, do nothing + } + else if(http_code == 413) { + elog("413 error: Can be low space disk"); + return 0; + } + else { + elog(http_code + "error: Unknown error"); + return 0; + } + + if(do_logs) { + auto logs = readBuffer; + // do logs + std::string url_logs = elasticsearch_url + logs_index + "/data/"; + curl_easy_setopt(curl, CURLOPT_URL, url_logs.c_str()); + curl_easy_setopt(curl, CURLOPT_POST, true); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, logs.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &readBuffer_logs); + curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcrp/0.1"); + curl_easy_perform(curl); + + http_code = 0; + curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); + if(http_code == 200) { + // all good, do nothing + return 1; + } + else if(http_code == 201) { + // 201 is ok + return 1; + } + else if(http_code == 409) { + // 409 for record already exist is ok + return 1; + } + else if(http_code == 413) { + elog("413 error: Can be low space disk"); + return 0; + } + else { + elog(http_code + "error: Unknown error"); + return 0; + } + } + return 0; +} + +std::vector createBulk(std::string index_name, std::string data, std::string id, bool onlycreate) +{ + std::vector bulk; + std::string create_string = ""; + if(!onlycreate) + create_string = ",\"_id\" : "+id; + + bulk.push_back("{ \"index\" : { \"_index\" : \""+index_name+"\", \"_type\" : \"data\" "+create_string+" } }"); + bulk.push_back(data); + + return bulk; +} + + +} } // end namespace graphene::utilities diff --git a/libraries/utilities/include/graphene/utilities/elasticsearch.hpp b/libraries/utilities/include/graphene/utilities/elasticsearch.hpp new file mode 100644 index 00000000..517f2345 --- /dev/null +++ b/libraries/utilities/include/graphene/utilities/elasticsearch.hpp @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2018 oxarbitrage, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#pragma once +#include +#include +#include + +#include + +static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) +{ + ((std::string*)userp)->append((char*)contents, size * nmemb); + return size * nmemb; +} + +namespace graphene { namespace utilities { + + bool SendBulk(CURL *curl, std::vector & bulk, std::string elasticsearch_url, bool do_logs, std::string logs_index); + std::vector createBulk(std::string type, std::string data, std::string id, bool onlycreate); + +} } // end namespace graphene::utilities diff --git a/programs/witness_node/CMakeLists.txt b/programs/witness_node/CMakeLists.txt index 0aa73cf1..0c4c1db4 100644 --- a/programs/witness_node/CMakeLists.txt +++ b/programs/witness_node/CMakeLists.txt @@ -11,7 +11,7 @@ endif() # We have to link against graphene_debug_witness because deficiency in our API infrastructure doesn't allow plugins to be fully abstracted #246 target_link_libraries( witness_node - PRIVATE graphene_app graphene_account_history graphene_affiliate_stats graphene_elasticsearch graphene_market_history graphene_witness graphene_chain graphene_debug_witness graphene_bookie graphene_egenesis_full fc ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} ) + PRIVATE graphene_app graphene_account_history graphene_affiliate_stats graphene_elasticsearch graphene_market_history graphene_witness graphene_chain graphene_debug_witness graphene_bookie graphene_egenesis_full graphene_es_objects fc ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} ) # also add dependencies to graphene_generate_genesis graphene_generate_uia_sharedrop_genesis if you want those plugins install( TARGETS diff --git a/programs/witness_node/main.cpp b/programs/witness_node/main.cpp index 65b36c04..0d6e65c6 100644 --- a/programs/witness_node/main.cpp +++ b/programs/witness_node/main.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include //#include //#include @@ -76,6 +77,7 @@ int main(int argc, char** argv) { auto debug_witness_plug = node->register_plugin(); auto history_plug = node->register_plugin(); auto elasticsearch_plug = node->register_plugin(); + auto es_objects_plug = node->register_plugin(); auto market_history_plug = node->register_plugin(); //auto generate_genesis_plug = node->register_plugin(); //auto generate_uia_sharedrop_genesis_plug = node->register_plugin();