Resolve "port ES changes from Bitshares"

This commit is contained in:
Vlad Dobromyslov 2022-01-31 05:25:56 +00:00 committed by serkixenos
parent 10799a2148
commit 39fcacd397
6 changed files with 376 additions and 303 deletions

View file

@ -22,6 +22,7 @@
* THE SOFTWARE.
*/
#include <boost/algorithm/string.hpp>
#include <graphene/elasticsearch/elasticsearch_plugin.hpp>
#include <graphene/chain/impacted.hpp>
#include <graphene/chain/account_evaluator.hpp>
@ -33,6 +34,15 @@ namespace graphene { namespace elasticsearch {
namespace detail
{
const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix)
{
auto block_date_string = block_date.to_iso_string();
std::vector<std::string> parts;
boost::split(parts, block_date_string, boost::is_any_of("-"));
std::string index_name = _elasticsearch_index_prefix + parts[0] + "-" + parts[1];
return index_name;
}
class elasticsearch_plugin_impl
{
public:
@ -48,6 +58,9 @@ class elasticsearch_plugin_impl
return _self.database();
}
friend class graphene::elasticsearch::elasticsearch_plugin;
private:
elasticsearch_plugin& _self;
primary_index< operation_history_index >* _oho_index;
@ -75,6 +88,8 @@ class elasticsearch_plugin_impl
std::string bulk_line;
std::string index_name;
bool is_sync = false;
bool is_es_version_7_or_above = true;
private:
bool add_elasticsearch( const account_id_type account_id, const optional<operation_history_object>& oho, const uint32_t block_number );
const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj,
@ -91,6 +106,7 @@ class elasticsearch_plugin_impl
void createBulkLine(const account_transaction_history_object& ath);
void prepareBulk(const account_transaction_history_id_type& ath_id);
void populateESstruct();
void init_program_options(const boost::program_options::variables_map& options);
};
elasticsearch_plugin_impl::~elasticsearch_plugin_impl()
@ -105,7 +121,7 @@ elasticsearch_plugin_impl::~elasticsearch_plugin_impl()
bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b )
{
checkState(b.timestamp);
index_name = graphene::utilities::generateIndexName(b.timestamp, _elasticsearch_index_prefix);
index_name = generateIndexName(b.timestamp, _elasticsearch_index_prefix);
graphene::chain::database& db = database();
const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
@ -229,6 +245,113 @@ void elasticsearch_plugin_impl::getOperationType(const optional <operation_histo
op_type = oho->op.which();
}
struct adaptor_struct
{
variant adapt(const variant_object& op)
{
fc::mutable_variant_object o(op);
vector<string> keys_to_rename;
for (auto& i : o)
{
auto& element = i.value();
if (element.is_object())
{
const string& name = i.key();
const auto& vo = element.get_object();
if (vo.contains(name.c_str()))
keys_to_rename.emplace_back(name);
element = adapt(vo);
}
else if (element.is_array())
adapt(element.get_array());
}
for (const auto& i : keys_to_rename)
{
string new_name = i + "_";
o[new_name] = variant(o[i]);
o.erase(i);
}
if (o.find("memo") != o.end())
{
auto& memo = o["memo"];
if (memo.is_string())
{
o["memo_"] = o["memo"];
o.erase("memo");
}
else if (memo.is_object())
{
fc::mutable_variant_object tmp(memo.get_object());
if (tmp.find("nonce") != tmp.end())
{
tmp["nonce"] = tmp["nonce"].as_string();
o["memo"] = tmp;
}
}
}
if (o.find("new_parameters") != o.end())
{
auto& tmp = o["new_parameters"];
if (tmp.is_object())
{
fc::mutable_variant_object tmp2(tmp.get_object());
if (tmp2.find("current_fees") != tmp2.end())
{
tmp2.erase("current_fees");
o["new_parameters"] = tmp2;
}
}
}
if (o.find("owner") != o.end() && o["owner"].is_string())
{
o["owner_"] = o["owner"].as_string();
o.erase("owner");
}
if (o.find("proposed_ops") != o.end())
{
o["proposed_ops"] = fc::json::to_string(o["proposed_ops"]);
}
if (o.find("initializer") != o.end())
{
o["initializer"] = fc::json::to_string(o["initializer"]);
}
if (o.find("policy") != o.end())
{
o["policy"] = fc::json::to_string(o["policy"]);
}
if (o.find("predicates") != o.end())
{
o["predicates"] = fc::json::to_string(o["predicates"]);
}
if (o.find("active_special_authority") != o.end())
{
o["active_special_authority"] = fc::json::to_string(o["active_special_authority"]);
}
if (o.find("owner_special_authority") != o.end())
{
o["owner_special_authority"] = fc::json::to_string(o["owner_special_authority"]);
}
variant v;
fc::to_variant(o, v, FC_PACK_MAX_DEPTH);
return v;
}
void adapt(fc::variants& v)
{
for (auto& array_element : v)
{
if (array_element.is_object())
array_element = adapt(array_element.get_object());
else if (array_element.is_array())
adapt(array_element.get_array());
else
array_element = array_element.as_string();
}
}
};
void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_history_object>& oho)
{
os.trx_in_block = oho->trx_in_block;
@ -255,6 +378,61 @@ void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_bloc
bs.trx_id = trx_id;
}
struct operation_visitor
{
using result_type = void;
share_type fee_amount;
asset_id_type fee_asset;
asset_id_type transfer_asset_id;
share_type transfer_amount;
account_id_type transfer_from;
account_id_type transfer_to;
void operator()( const graphene::chain::transfer_operation& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;
transfer_asset_id = o.amount.asset_id;
transfer_amount = o.amount.amount;
transfer_from = o.from;
transfer_to = o.to;
}
object_id_type fill_order_id;
account_id_type fill_account_id;
asset_id_type fill_pays_asset_id;
share_type fill_pays_amount;
asset_id_type fill_receives_asset_id;
share_type fill_receives_amount;
//double fill_fill_price;
//bool fill_is_maker;
void operator()( const graphene::chain::fill_order_operation& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;
fill_order_id = o.order_id;
fill_account_id = o.account_id;
fill_pays_asset_id = o.pays.asset_id;
fill_pays_amount = o.pays.amount;
fill_receives_asset_id = o.receives.asset_id;
fill_receives_amount = o.receives.amount;
//fill_fill_price = o.fill_price.to_real();
//fill_is_maker = o.is_maker;
}
template<typename T>
void operator()( const T& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;
}
};
void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_object>& oho)
{
graphene::chain::database& db = database();
@ -380,7 +558,8 @@ void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id
const std::string _id = fc::json::to_string(ath_id);
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = index_name;
bulk_header["_type"] = "data";
if( !is_es_version_7_or_above )
bulk_header["_type"] = "_doc";
bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "."
+ fc::to_string(ath_id.instance.value);
prepare = graphene::utilities::createBulk(bulk_header, std::move(bulk_line));
@ -428,6 +607,43 @@ void elasticsearch_plugin_impl::populateESstruct()
es.query = "";
}
void elasticsearch_plugin_impl::init_program_options(const boost::program_options::variables_map& options)
{
if (options.count("elasticsearch-node-url")) {
_elasticsearch_node_url = options["elasticsearch-node-url"].as<std::string>();
}
if (options.count("elasticsearch-bulk-replay")) {
_elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as<uint32_t>();
}
if (options.count("elasticsearch-bulk-sync")) {
_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as<uint32_t>();
}
if (options.count("elasticsearch-visitor")) {
_elasticsearch_visitor = options["elasticsearch-visitor"].as<bool>();
}
if (options.count("elasticsearch-basic-auth")) {
_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as<std::string>();
}
if (options.count("elasticsearch-index-prefix")) {
_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as<std::string>();
}
if (options.count("elasticsearch-operation-object")) {
_elasticsearch_operation_object = options["elasticsearch-operation-object"].as<bool>();
}
if (options.count("elasticsearch-start-es-after-block")) {
_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
if (options.count("elasticsearch-operation-string")) {
_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
}
if (options.count("elasticsearch-mode")) {
const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
if(option_number > mode::all)
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid");
_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
}
}
} // end namespace detail
elasticsearch_plugin::elasticsearch_plugin() :
@ -480,42 +696,12 @@ void elasticsearch_plugin::plugin_set_program_options(
void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
ilog("elasticsearch ACCOUNT HISTORY: plugin_initialize() begin");
my->_oho_index = database().add_index< primary_index< operation_history_index > >();
database().add_index< primary_index< account_transaction_history_index > >();
if (options.count("elasticsearch-node-url")) {
my->_elasticsearch_node_url = options["elasticsearch-node-url"].as<std::string>();
}
if (options.count("elasticsearch-bulk-replay")) {
my->_elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as<uint32_t>();
}
if (options.count("elasticsearch-bulk-sync")) {
my->_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as<uint32_t>();
}
if (options.count("elasticsearch-visitor")) {
my->_elasticsearch_visitor = options["elasticsearch-visitor"].as<bool>();
}
if (options.count("elasticsearch-basic-auth")) {
my->_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as<std::string>();
}
if (options.count("elasticsearch-index-prefix")) {
my->_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as<std::string>();
}
if (options.count("elasticsearch-operation-object")) {
my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as<bool>();
}
if (options.count("elasticsearch-start-es-after-block")) {
my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
if (options.count("elasticsearch-operation-string")) {
my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
}
if (options.count("elasticsearch-mode")) {
const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
if(option_number > mode::all)
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid");
my->_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
}
my->init_program_options( options );
if(my->_elasticsearch_mode != mode::only_query) {
if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string)
@ -528,10 +714,7 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
"Error populating ES database, we are going to keep trying.");
});
}
}
void elasticsearch_plugin::plugin_startup()
{
graphene::utilities::ES es;
es.curl = my->curl;
es.elasticsearch_url = my->_elasticsearch_node_url;
@ -539,7 +722,17 @@ void elasticsearch_plugin::plugin_startup()
if(!graphene::utilities::checkES(es))
FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_elasticsearch_node_url));
graphene::utilities::checkESVersion7OrAbove(es, my->is_es_version_7_or_above);
ilog("elasticsearch ACCOUNT HISTORY: plugin_initialize() end");
}
void elasticsearch_plugin::plugin_startup()
{
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin");
// Nothing to do
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() end");
}
operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id)
@ -655,7 +848,7 @@ graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
es.curl = curl;
es.elasticsearch_url = my->_elasticsearch_node_url;
es.index_prefix = my->_elasticsearch_index_prefix;
es.endpoint = es.index_prefix + "*/data/_search";
es.endpoint = es.index_prefix + "*/_doc/_search";
es.query = query;
return es;

View file

@ -79,62 +79,6 @@ class elasticsearch_plugin : public graphene::app::plugin
graphene::utilities::ES prepareHistoryQuery(string query);
};
struct operation_visitor
{
typedef void result_type;
share_type fee_amount;
asset_id_type fee_asset;
asset_id_type transfer_asset_id;
share_type transfer_amount;
account_id_type transfer_from;
account_id_type transfer_to;
void operator()( const graphene::chain::transfer_operation& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;
transfer_asset_id = o.amount.asset_id;
transfer_amount = o.amount.amount;
transfer_from = o.from;
transfer_to = o.to;
}
object_id_type fill_order_id;
account_id_type fill_account_id;
asset_id_type fill_pays_asset_id;
share_type fill_pays_amount;
asset_id_type fill_receives_asset_id;
share_type fill_receives_amount;
//double fill_fill_price;
//bool fill_is_maker;
void operator()( const graphene::chain::fill_order_operation& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;
fill_order_id = o.order_id;
fill_account_id = o.account_id;
fill_pays_asset_id = o.pays.asset_id;
fill_pays_amount = o.pays.amount;
fill_receives_asset_id = o.receives.asset_id;
fill_receives_amount = o.receives.amount;
//fill_fill_price = o.fill_price.to_real();
//fill_is_maker = o.is_maker;
}
template<typename T>
void operator()( const T& o )
{
fee_asset = o.fee.asset_id;
fee_amount = o.fee.amount;
}
};
struct operation_history_struct {
int trx_in_block;
int op_in_trx;
@ -197,113 +141,6 @@ struct bulk_struct {
optional<visitor_struct> additional_data;
};
struct adaptor_struct {
variant adapt(const variant_object& op)
{
fc::mutable_variant_object o(op);
vector<string> keys_to_rename;
for (auto i = o.begin(); i != o.end(); ++i)
{
auto& element = (*i).value();
if (element.is_object())
{
const string& name = (*i).key();
auto& vo = element.get_object();
if (vo.contains(name.c_str()))
keys_to_rename.emplace_back(name);
element = adapt(vo);
}
else if (element.is_array())
adapt(element.get_array());
}
for (const auto& i : keys_to_rename)
{
string new_name = i + "_";
o[new_name] = variant(o[i]);
o.erase(i);
}
if (o.find("memo") != o.end())
{
auto& memo = o["memo"];
if (memo.is_string())
{
o["memo_"] = o["memo"];
o.erase("memo");
}
else if (memo.is_object())
{
fc::mutable_variant_object tmp(memo.get_object());
if (tmp.find("nonce") != tmp.end())
{
tmp["nonce"] = tmp["nonce"].as_string();
o["memo"] = tmp;
}
}
}
if (o.find("new_parameters") != o.end())
{
auto& tmp = o["new_parameters"];
if (tmp.is_object())
{
fc::mutable_variant_object tmp2(tmp.get_object());
if (tmp2.find("current_fees") != tmp2.end())
{
tmp2.erase("current_fees");
o["new_parameters"] = tmp2;
}
}
}
if (o.find("owner") != o.end() && o["owner"].is_string())
{
o["owner_"] = o["owner"].as_string();
o.erase("owner");
}
if (o.find("proposed_ops") != o.end())
{
o["proposed_ops"] = fc::json::to_string(o["proposed_ops"]);
}
if (o.find("initializer") != o.end())
{
o["initializer"] = fc::json::to_string(o["initializer"]);
}
if (o.find("policy") != o.end())
{
o["policy"] = fc::json::to_string(o["policy"]);
}
if (o.find("predicates") != o.end())
{
o["predicates"] = fc::json::to_string(o["predicates"]);
}
if (o.find("active_special_authority") != o.end())
{
o["active_special_authority"] = fc::json::to_string(o["active_special_authority"]);
}
if (o.find("owner_special_authority") != o.end())
{
o["owner_special_authority"] = fc::json::to_string(o["owner_special_authority"]);
}
variant v;
fc::to_variant(o, v, FC_PACK_MAX_DEPTH);
return v;
}
void adapt(fc::variants& v)
{
for (auto& array_element : v)
{
if (array_element.is_object())
array_element = adapt(array_element.get_object());
else if (array_element.is_array())
adapt(array_element.get_array());
else
array_element = array_element.as_string();
}
}
};
} } //graphene::elasticsearch
FC_REFLECT_ENUM( graphene::elasticsearch::mode, (only_save)(only_query)(all) )

View file

@ -66,6 +66,9 @@ class es_objects_plugin_impl
bool genesis();
void remove_from_database(object_id_type id, std::string index);
friend class graphene::es_objects::es_objects_plugin;
private:
es_objects_plugin& _self;
std::string _es_objects_elasticsearch_url = "http://localhost:9200/";
std::string _es_objects_auth = "";
@ -97,10 +100,12 @@ class es_objects_plugin_impl
uint32_t block_number;
fc::time_point_sec block_time;
bool is_es_version_7_or_above = true;
private:
template<typename T>
void prepareTemplate(T blockchain_object, string index_name);
void init_program_options(const boost::program_options::variables_map& options);
};
bool es_objects_plugin_impl::genesis()
@ -523,7 +528,8 @@ void es_objects_plugin_impl::remove_from_database( object_id_type id, std::strin
fc::mutable_variant_object delete_line;
delete_line["_id"] = string(id);
delete_line["_index"] = _es_objects_index_prefix + index;
delete_line["_type"] = "data";
if( !is_es_version_7_or_above )
delete_line["_type"] = "_doc";
fc::mutable_variant_object final_delete_line;
final_delete_line["delete"] = delete_line;
prepare.push_back(fc::json::to_string(final_delete_line));
@ -537,7 +543,8 @@ void es_objects_plugin_impl::prepareTemplate(T blockchain_object, string index_n
{
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = _es_objects_index_prefix + index_name;
bulk_header["_type"] = "data";
if( !is_es_version_7_or_above )
bulk_header["_type"] = "_doc";
if(_es_objects_keep_only_current)
{
bulk_header["_id"] = string(blockchain_object.id);
@ -567,6 +574,72 @@ es_objects_plugin_impl::~es_objects_plugin_impl()
}
return;
}
void es_objects_plugin_impl::init_program_options(const boost::program_options::variables_map& options)
{
if (options.count("es-objects-elasticsearch-url")) {
_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as<std::string>();
}
if (options.count("es-objects-auth")) {
_es_objects_auth = options["es-objects-auth"].as<std::string>();
}
if (options.count("es-objects-bulk-replay")) {
_es_objects_bulk_replay = options["es-objects-bulk-replay"].as<uint32_t>();
}
if (options.count("es-objects-bulk-sync")) {
_es_objects_bulk_sync = options["es-objects-bulk-sync"].as<uint32_t>();
}
if (options.count("es-objects-proposals")) {
_es_objects_proposals = options["es-objects-proposals"].as<bool>();
}
if (options.count("es-objects-accounts")) {
_es_objects_accounts = options["es-objects-accounts"].as<bool>();
}
if (options.count("es-objects-assets")) {
_es_objects_assets = options["es-objects-assets"].as<bool>();
}
if (options.count("es-objects-balances")) {
_es_objects_balances = options["es-objects-balances"].as<bool>();
}
if (options.count("es-objects-limit-orders")) {
_es_objects_limit_orders = options["es-objects-limit-orders"].as<bool>();
}
if (options.count("es-objects-asset-bitasset")) {
_es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as<bool>();
}
if (options.count("es-objects-account-role")) {
_es_objects_balances = options["es-objects-account-role"].as<bool>();
}
if (options.count("es-objects-committee-member")) {
_es_objects_balances = options["es-objects-committee-member"].as<bool>();
}
if (options.count("es-objects-nft")) {
_es_objects_balances = options["es-objects-nft"].as<bool>();
}
if (options.count("es-objects-son")) {
_es_objects_balances = options["es-objects-son"].as<bool>();
}
if (options.count("es-objects-transaction")) {
_es_objects_balances = options["es-objects-transaction"].as<bool>();
}
if (options.count("es-objects-vesting-balance")) {
_es_objects_balances = options["es-objects-vesting-balance"].as<bool>();
}
if (options.count("es-objects-witness")) {
_es_objects_balances = options["es-objects-witness"].as<bool>();
}
if (options.count("es-objects-worker")) {
_es_objects_balances = options["es-objects-worker"].as<bool>();
}
if (options.count("es-objects-index-prefix")) {
_es_objects_index_prefix = options["es-objects-index-prefix"].as<std::string>();
}
if (options.count("es-objects-keep-only-current")) {
_es_objects_keep_only_current = options["es-objects-keep-only-current"].as<bool>();
}
if (options.count("es-objects-start-es-after-block")) {
_es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as<uint32_t>();
}
}
} // end namespace detail
@ -627,69 +700,9 @@ void es_objects_plugin::plugin_set_program_options(
void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
if (options.count("es-objects-elasticsearch-url")) {
my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as<std::string>();
}
if (options.count("es-objects-auth")) {
my->_es_objects_auth = options["es-objects-auth"].as<std::string>();
}
if (options.count("es-objects-bulk-replay")) {
my->_es_objects_bulk_replay = options["es-objects-bulk-replay"].as<uint32_t>();
}
if (options.count("es-objects-bulk-sync")) {
my->_es_objects_bulk_sync = options["es-objects-bulk-sync"].as<uint32_t>();
}
if (options.count("es-objects-proposals")) {
my->_es_objects_proposals = options["es-objects-proposals"].as<bool>();
}
if (options.count("es-objects-accounts")) {
my->_es_objects_accounts = options["es-objects-accounts"].as<bool>();
}
if (options.count("es-objects-assets")) {
my->_es_objects_assets = options["es-objects-assets"].as<bool>();
}
if (options.count("es-objects-balances")) {
my->_es_objects_balances = options["es-objects-balances"].as<bool>();
}
if (options.count("es-objects-limit-orders")) {
my->_es_objects_limit_orders = options["es-objects-limit-orders"].as<bool>();
}
if (options.count("es-objects-asset-bitasset")) {
my->_es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as<bool>();
}
if (options.count("es-objects-account-role")) {
my->_es_objects_balances = options["es-objects-account-role"].as<bool>();
}
if (options.count("es-objects-committee-member")) {
my->_es_objects_balances = options["es-objects-committee-member"].as<bool>();
}
if (options.count("es-objects-nft")) {
my->_es_objects_balances = options["es-objects-nft"].as<bool>();
}
if (options.count("es-objects-son")) {
my->_es_objects_balances = options["es-objects-son"].as<bool>();
}
if (options.count("es-objects-transaction")) {
my->_es_objects_balances = options["es-objects-transaction"].as<bool>();
}
if (options.count("es-objects-vesting-balance")) {
my->_es_objects_balances = options["es-objects-vesting-balance"].as<bool>();
}
if (options.count("es-objects-witness")) {
my->_es_objects_balances = options["es-objects-witness"].as<bool>();
}
if (options.count("es-objects-worker")) {
my->_es_objects_balances = options["es-objects-worker"].as<bool>();
}
if (options.count("es-objects-index-prefix")) {
my->_es_objects_index_prefix = options["es-objects-index-prefix"].as<std::string>();
}
if (options.count("es-objects-keep-only-current")) {
my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as<bool>();
}
if (options.count("es-objects-start-es-after-block")) {
my->_es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as<uint32_t>();
}
ilog("elasticsearch OBJECTS: plugin_initialize() begin");
my->init_program_options( options );
database().applied_block.connect([this](const signed_block &b) {
if(b.block_num() == 1 && my->_es_objects_start_es_after_block == 0) {
@ -721,10 +734,7 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable
"Error deleting object from ES database, we are going to keep trying.");
}
});
}
void es_objects_plugin::plugin_startup()
{
graphene::utilities::ES es;
es.curl = my->curl;
es.elasticsearch_url = my->_es_objects_elasticsearch_url;
@ -733,7 +743,17 @@ void es_objects_plugin::plugin_startup()
if(!graphene::utilities::checkES(es))
FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_es_objects_elasticsearch_url));
ilog("elasticsearch OBJECTS: plugin_startup() begin");
graphene::utilities::checkESVersion7OrAbove(es, my->is_es_version_7_or_above);
ilog("elasticsearch OBJECTS: plugin_initialize() end");
}
void es_objects_plugin::plugin_startup()
{
ilog("elasticsearch OBJECTS: plugin_startup() begin");
// Nothing to do
ilog("elasticsearch OBJECTS: plugin_startup() end");
}
} }

View file

@ -24,7 +24,6 @@
#include <graphene/utilities/elasticsearch.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string.hpp>
#include <fc/log/logger.hpp>
#include <fc/io/json.hpp>
@ -47,8 +46,36 @@ bool checkES(ES& es)
if(doCurl(curl_request).empty())
return false;
return true;
}
const std::string getESVersion(ES& es)
{
graphene::utilities::CurlRequest curl_request;
curl_request.handler = es.curl;
curl_request.url = es.elasticsearch_url;
curl_request.auth = es.auth;
curl_request.type = "GET";
fc::variant response = fc::json::from_string(doCurl(curl_request));
return response["version"]["number"].as_string();
}
void checkESVersion7OrAbove(ES& es, bool& result) noexcept
{
static const int64_t version_7 = 7;
try {
const auto es_version = graphene::utilities::getESVersion(es);
auto dot_pos = es_version.find('.');
result = ( std::stoi(es_version.substr(0,dot_pos)) >= version_7 );
}
catch( ... )
{
wlog( "Unable to get ES version, assuming it is 7 or above" );
result = true;
}
}
const std::string simpleQuery(ES& es)
{
graphene::utilities::CurlRequest curl_request;
@ -118,13 +145,13 @@ bool handleBulkResponse(long http_code, const std::string& CurlReadBuffer)
return true;
}
const std::vector<std::string> createBulk(const fc::mutable_variant_object& bulk_header, const std::string& data)
const std::vector<std::string> createBulk(const fc::mutable_variant_object& bulk_header, const std::string&& data)
{
std::vector<std::string> bulk;
fc::mutable_variant_object final_bulk_header;
final_bulk_header["index"] = bulk_header;
bulk.push_back(fc::json::to_string(final_bulk_header));
bulk.push_back(data);
bulk.emplace_back(std::move(data));
return bulk;
}
@ -154,15 +181,6 @@ const std::string getEndPoint(ES& es)
return doCurl(curl_request);
}
const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix)
{
auto block_date_string = block_date.to_iso_string();
std::vector<std::string> parts;
boost::split(parts, block_date_string, boost::is_any_of("-"));
std::string index_name = _elasticsearch_index_prefix + parts[0] + "-" + parts[1];
return index_name;
}
const std::string doCurl(CurlRequest& curl)
{
std::string CurlReadBuffer;

View file

@ -54,13 +54,14 @@ namespace graphene { namespace utilities {
};
bool SendBulk(ES& es);
const std::vector<std::string> createBulk(const fc::mutable_variant_object& bulk_header, const std::string& data);
const std::vector<std::string> createBulk(const fc::mutable_variant_object& bulk_header, const std::string&& data);
bool checkES(ES& es);
const std::string getESVersion(ES& es);
void checkESVersion7OrAbove(ES& es, bool& result) noexcept;
const std::string simpleQuery(ES& es);
bool deleteAll(ES& es);
bool handleBulkResponse(long http_code, const std::string& CurlReadBuffer);
const std::string getEndPoint(ES& es);
const std::string generateIndexName(const fc::time_point_sec& block_date, const std::string& _elasticsearch_index_prefix);
const std::string doCurl(CurlRequest& curl);
const std::string joinBulkLines(const std::vector<std::string>& bulk);
long getResponseCode(CURL *handler);

View file

@ -38,6 +38,10 @@ using namespace graphene::chain;
using namespace graphene::chain::test;
using namespace graphene::app;
const std::string g_es_url = "http://localhost:9200/";
const std::string g_es_index_prefix = "peerplays-";
const std::string g_es_ppobjects_prefix = "ppobjects-";
BOOST_FIXTURE_TEST_SUITE( elasticsearch_tests, database_fixture )
BOOST_AUTO_TEST_CASE(elasticsearch_account_history) {
@ -48,8 +52,8 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) {
graphene::utilities::ES es;
es.curl = curl;
es.elasticsearch_url = "http://localhost:9200/";
es.index_prefix = "peerplays-";
es.elasticsearch_url = g_es_url;
es.index_prefix = g_es_index_prefix;
//es.auth = "elastic:changeme";
// delete all first
@ -71,7 +75,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) {
//int account_create_op_id = operation::tag<account_create_operation>::value;
string query = "{ \"query\" : { \"bool\" : { \"must\" : [{\"match_all\": {}}] } } }";
es.endpoint = es.index_prefix + "*/data/_count";
es.endpoint = es.index_prefix + "*/_doc/_count";
es.query = query;
auto res = graphene::utilities::simpleQuery(es);
@ -79,7 +83,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) {
auto total = j["count"].as_string();
BOOST_CHECK_EQUAL(total, "5");
es.endpoint = es.index_prefix + "*/data/_search";
es.endpoint = es.index_prefix + "*/_doc/_search";
res = graphene::utilities::simpleQuery(es);
j = fc::json::from_string(res);
auto first_id = j["hits"]["hits"][size_t(0)]["_id"].as_string();
@ -91,7 +95,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) {
fc::usleep(fc::milliseconds(1000)); // index.refresh_interval
es.endpoint = es.index_prefix + "*/data/_count";
es.endpoint = es.index_prefix + "*/_doc/_count";
res = graphene::utilities::simpleQuery(es);
j = fc::json::from_string(res);
@ -114,9 +118,9 @@ BOOST_AUTO_TEST_CASE(elasticsearch_account_history) {
// check the visitor data
auto block_date = db.head_block_time();
std::string index_name = graphene::utilities::generateIndexName(block_date, "peerplays-");
std::string index_name = g_es_index_prefix + block_date.to_iso_string().substr( 0, 7 ); // yyyy-mm
es.endpoint = index_name + "/data/2.9.12"; // we know last op is a transfer of amount 300
es.endpoint = index_name + "/_doc/2.9.12"; // we know last op is a transfer of amount 300
res = graphene::utilities::getEndPoint(es);
j = fc::json::from_string(res);
auto last_transfer_amount = j["_source"]["operation_history"]["op_object"]["amount_"]["amount"].as_string();
@ -137,8 +141,8 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
graphene::utilities::ES es;
es.curl = curl;
es.elasticsearch_url = "http://localhost:9200/";
es.index_prefix = "ppobjects-";
es.elasticsearch_url = g_es_url;
es.index_prefix = g_es_ppobjects_prefix;
//es.auth = "elastic:changeme";
// delete all first
@ -155,7 +159,7 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
fc::usleep(fc::milliseconds(1000));
string query = "{ \"query\" : { \"bool\" : { \"must\" : [{\"match_all\": {}}] } } }";
es.endpoint = es.index_prefix + "*/data/_count";
es.endpoint = es.index_prefix + "*/_doc/_count";
es.query = query;
auto res = graphene::utilities::simpleQuery(es);
@ -163,14 +167,14 @@ BOOST_AUTO_TEST_CASE(elasticsearch_objects) {
auto total = j["count"].as_string();
BOOST_CHECK_EQUAL(total, "2");
es.endpoint = es.index_prefix + "asset/data/_search";
es.endpoint = es.index_prefix + "asset/_doc/_search";
res = graphene::utilities::simpleQuery(es);
j = fc::json::from_string(res);
auto first_id = j["hits"]["hits"][size_t(0)]["_source"]["symbol"].as_string();
BOOST_CHECK_EQUAL(first_id, "USD");
auto bitasset_data_id = j["hits"]["hits"][size_t(0)]["_source"]["bitasset_data_id"].as_string();
es.endpoint = es.index_prefix + "bitasset/data/_search";
es.endpoint = es.index_prefix + "bitasset/_doc/_search";
es.query = "{ \"query\" : { \"bool\": { \"must\" : [{ \"term\": { \"object_id\": \""+bitasset_data_id+"\"}}] } } }";
res = graphene::utilities::simpleQuery(es);
j = fc::json::from_string(res);
@ -192,11 +196,11 @@ BOOST_AUTO_TEST_CASE(elasticsearch_suite) {
graphene::utilities::ES es;
es.curl = curl;
es.elasticsearch_url = "http://localhost:9200/";
es.index_prefix = "peerplays-";
es.elasticsearch_url = g_es_url;
es.index_prefix = g_es_index_prefix;
auto delete_account_history = graphene::utilities::deleteAll(es);
fc::usleep(fc::milliseconds(1000));
es.index_prefix = "ppobjects-";
es.index_prefix = g_es_ppobjects_prefix;
auto delete_objects = graphene::utilities::deleteAll(es);
fc::usleep(fc::milliseconds(1000));
@ -218,8 +222,8 @@ BOOST_AUTO_TEST_CASE(elasticsearch_history_api) {
graphene::utilities::ES es;
es.curl = curl;
es.elasticsearch_url = "http://localhost:9200/";
es.index_prefix = "peerplays-";
es.elasticsearch_url = g_es_url;
es.index_prefix = g_es_index_prefix;
auto delete_account_history = graphene::utilities::deleteAll(es);