Merge pull request #1271 from oxarbitrage/es_objects

refine es_objects plugin
This commit is contained in:
Alfredo Garcia 2018-10-15 14:42:45 -03:00 committed by gladcow
parent c9583f4486
commit 2d19aa3de1
2 changed files with 183 additions and 136 deletions

View file

@ -47,13 +47,14 @@ class es_objects_plugin_impl
{ curl = curl_easy_init(); }
virtual ~es_objects_plugin_impl();
bool updateDatabase( const vector<object_id_type>& ids , bool isNew);
bool index_database( const vector<object_id_type>& ids, std::string action);
void remove_from_database( object_id_type id, std::string index);
es_objects_plugin& _self;
std::string _es_objects_elasticsearch_url = "http://localhost:9200/";
std::string _es_objects_auth = "";
uint32_t _es_objects_bulk_replay = 5000;
uint32_t _es_objects_bulk_sync = 10;
uint32_t _es_objects_bulk_replay = 10000;
uint32_t _es_objects_bulk_sync = 100;
bool _es_objects_proposals = true;
bool _es_objects_accounts = true;
bool _es_objects_assets = true;
@ -64,27 +65,27 @@ class es_objects_plugin_impl
CURL *curl; // curl handler
vector <std::string> bulk;
vector<std::string> prepare;
map<object_id_type, bitasset_struct> bitassets;
map<object_id_type, account_struct> accounts;
map<object_id_type, proposal_struct> proposals;
map<object_id_type, asset_struct> assets;
map<object_id_type, balance_struct> balances;
map<object_id_type, limit_order_struct> limit_orders;
bool _es_objects_keep_only_current = true;
uint32_t block_number;
fc::time_point_sec block_time;
private:
void PrepareProposal(const proposal_object& proposal_object, const fc::time_point_sec& block_time, const uint32_t& block_number);
void PrepareAccount(const account_object& account_object, const fc::time_point_sec& block_time, const uint32_t& block_number);
void PrepareAsset(const asset_object& asset_object, const fc::time_point_sec& block_time, const uint32_t& block_number);
void PrepareBalance(const balance_object& balance_object, const fc::time_point_sec& block_time, const uint32_t& block_number);
void PrepareLimit(const limit_order_object& limit_object, const fc::time_point_sec& block_time, const uint32_t& block_number);
void PrepareBitAsset(const asset_bitasset_data_object& bitasset_object, const fc::time_point_sec& block_time, const uint32_t& block_number);
void prepare_proposal(const proposal_object& proposal_object);
void prepare_account(const account_object& account_object);
void prepare_asset(const asset_object& asset_object);
void prepare_balance(const account_balance_object& account_balance_object);
void prepare_limit(const limit_order_object& limit_object);
void prepare_bitasset(const asset_bitasset_data_object& bitasset_object);
};
bool es_objects_plugin_impl::updateDatabase( const vector<object_id_type>& ids , bool isNew)
bool es_objects_plugin_impl::index_database( const vector<object_id_type>& ids, std::string action)
{
graphene::chain::database &db = _self.database();
const fc::time_point_sec block_time = db.head_block_time();
const uint32_t block_number = db.head_block_num();
block_time = db.head_block_time();
block_number = db.head_block_num();
// check if we are in replay or in sync and change number of bulk documents accordingly
uint32_t limit_documents = 0;
@ -97,38 +98,62 @@ bool es_objects_plugin_impl::updateDatabase( const vector<object_id_type>& ids ,
if(value.is<proposal_object>() && _es_objects_proposals) {
auto obj = db.find_object(value);
auto p = static_cast<const proposal_object*>(obj);
if(p != nullptr)
PrepareProposal(*p, block_time, block_number);
if(p != nullptr) {
if(action == "delete")
remove_from_database(p->id, "proposal");
else
prepare_proposal(*p);
}
}
else if(value.is<account_object>() && _es_objects_accounts) {
auto obj = db.find_object(value);
auto a = static_cast<const account_object*>(obj);
if(a != nullptr)
PrepareAccount(*a, block_time, block_number);
if(a != nullptr) {
if(action == "delete")
remove_from_database(a->id, "account");
else
prepare_account(*a);
}
}
else if(value.is<asset_object>() && _es_objects_assets) {
auto obj = db.find_object(value);
auto a = static_cast<const asset_object*>(obj);
if(a != nullptr)
PrepareAsset(*a, block_time, block_number);
if(a != nullptr) {
if(action == "delete")
remove_from_database(a->id, "asset");
else
prepare_asset(*a);
}
}
else if(value.is<balance_object>() && _es_objects_balances) {
else if(value.is<account_balance_object>() && _es_objects_balances) {
auto obj = db.find_object(value);
auto b = static_cast<const balance_object*>(obj);
if(b != nullptr)
PrepareBalance(*b, block_time, block_number);
auto b = static_cast<const account_balance_object*>(obj);
if(b != nullptr) {
if(action == "delete")
remove_from_database(b->id, "balance");
else
prepare_balance(*b);
}
}
else if(value.is<limit_order_object>() && _es_objects_limit_orders) {
auto obj = db.find_object(value);
auto l = static_cast<const limit_order_object*>(obj);
if(l != nullptr)
PrepareLimit(*l, block_time, block_number);
if(l != nullptr) {
if(action == "delete")
remove_from_database(l->id, "limitorder");
else
prepare_limit(*l);
}
}
else if(value.is<asset_bitasset_data_object>() && _es_objects_asset_bitasset) {
auto obj = db.find_object(value);
auto ba = static_cast<const asset_bitasset_data_object*>(obj);
if(ba != nullptr)
PrepareBitAsset(*ba, block_time, block_number);
if(ba != nullptr) {
if(action == "delete")
remove_from_database(ba->id, "bitasset");
else
prepare_bitasset(*ba);
}
}
}
@ -149,8 +174,23 @@ bool es_objects_plugin_impl::updateDatabase( const vector<object_id_type>& ids ,
return true;
}
void es_objects_plugin_impl::PrepareProposal(const proposal_object& proposal_object,
const fc::time_point_sec& block_time, const uint32_t& block_number)
void es_objects_plugin_impl::remove_from_database( object_id_type id, std::string index)
{
if(_es_objects_keep_only_current)
{
fc::mutable_variant_object delete_line;
delete_line["_id"] = string(id);
delete_line["_index"] = _es_objects_index_prefix + index;
delete_line["_type"] = "data";
fc::mutable_variant_object final_delete_line;
final_delete_line["delete"] = delete_line;
prepare.push_back(fc::json::to_string(final_delete_line));
std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
prepare.clear();
}
}
void es_objects_plugin_impl::prepare_proposal(const proposal_object& proposal_object)
{
proposal_struct prop;
prop.object_id = proposal_object.id;
@ -165,27 +205,22 @@ void es_objects_plugin_impl::PrepareProposal(const proposal_object& proposal_obj
prop.available_key_approvals = fc::json::to_string(proposal_object.available_key_approvals);
prop.proposer = proposal_object.proposer;
auto it = proposals.find(proposal_object.id);
if(it == proposals.end())
proposals[proposal_object.id] = prop;
else {
if(it->second == prop) return;
else proposals[proposal_object.id] = prop;
}
std::string data = fc::json::to_string(prop);
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = _es_objects_index_prefix + "proposal";
bulk_header["_type"] = "data";
if(_es_objects_keep_only_current)
{
bulk_header["_id"] = string(prop.object_id);
}
prepare = graphene::utilities::createBulk(bulk_header, data);
bulk.insert(bulk.end(), prepare.begin(), prepare.end());
prepare = graphene::utilities::createBulk(bulk_header, std::move(data));
std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
prepare.clear();
}
void es_objects_plugin_impl::PrepareAccount(const account_object& account_object,
const fc::time_point_sec& block_time, const uint32_t& block_number)
void es_objects_plugin_impl::prepare_account(const account_object& account_object)
{
account_struct acct;
acct.object_id = account_object.id;
@ -206,28 +241,24 @@ void es_objects_plugin_impl::PrepareAccount(const account_object& account_object
acct.active_key_auths = fc::json::to_string(account_object.active.key_auths);
acct.active_address_auths = fc::json::to_string(account_object.active.address_auths);
acct.voting_account = account_object.options.voting_account;
auto it = accounts.find(account_object.id);
if(it == accounts.end())
accounts[account_object.id] = acct;
else {
if(it->second == acct) return;
else accounts[account_object.id] = acct;
}
acct.votes = fc::json::to_string(account_object.options.votes);
std::string data = fc::json::to_string(acct);
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = _es_objects_index_prefix + "acount";
bulk_header["_index"] = _es_objects_index_prefix + "account";
bulk_header["_type"] = "data";
if(_es_objects_keep_only_current)
{
bulk_header["_id"] = string(acct.object_id);
}
prepare = graphene::utilities::createBulk(bulk_header, data);
bulk.insert(bulk.end(), prepare.begin(), prepare.end());
prepare = graphene::utilities::createBulk(bulk_header, std::move(data));
std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
prepare.clear();
}
void es_objects_plugin_impl::PrepareAsset(const asset_object& asset_object,
const fc::time_point_sec& block_time, const uint32_t& block_number)
void es_objects_plugin_impl::prepare_asset(const asset_object& asset_object)
{
asset_struct asset;
asset.object_id = asset_object.id;
@ -239,56 +270,47 @@ void es_objects_plugin_impl::PrepareAsset(const asset_object& asset_object,
asset.dynamic_asset_data_id = asset_object.dynamic_asset_data_id;
asset.bitasset_data_id = asset_object.bitasset_data_id;
auto it = assets.find(asset_object.id);
if(it == assets.end())
assets[asset_object.id] = asset;
else {
if(it->second == asset) return;
else assets[asset_object.id] = asset;
}
std::string data = fc::json::to_string(asset);
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = _es_objects_index_prefix + "asset";
bulk_header["_type"] = "data";
if(_es_objects_keep_only_current)
{
bulk_header["_id"] = string(asset.object_id);
}
prepare = graphene::utilities::createBulk(bulk_header, data);
bulk.insert(bulk.end(), prepare.begin(), prepare.end());
prepare = graphene::utilities::createBulk(bulk_header, std::move(data));
std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
prepare.clear();
}
void es_objects_plugin_impl::PrepareBalance(const balance_object& balance_object,
const fc::time_point_sec& block_time, const uint32_t& block_number)
void es_objects_plugin_impl::prepare_balance(const account_balance_object& account_balance_object)
{
balance_struct balance;
balance.object_id = balance_object.id;
balance.object_id = account_balance_object.id;
balance.block_time = block_time;
balance.block_number = block_number;balance.owner = balance_object.owner;
balance.asset_id = balance_object.balance.asset_id;
balance.amount = balance_object.balance.amount;
auto it = balances.find(balance_object.id);
if(it == balances.end())
balances[balance_object.id] = balance;
else {
if(it->second == balance) return;
else balances[balance_object.id] = balance;
}
balance.block_number = block_number;
balance.owner = account_balance_object.owner;
balance.asset_type = account_balance_object.asset_type;
balance.balance = account_balance_object.balance;
std::string data = fc::json::to_string(balance);
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = _es_objects_index_prefix + "balance";
bulk_header["_type"] = "data";
if(_es_objects_keep_only_current)
{
bulk_header["_id"] = string(balance.object_id);
}
prepare = graphene::utilities::createBulk(bulk_header, data);
bulk.insert(bulk.end(), prepare.begin(), prepare.end());
prepare = graphene::utilities::createBulk(bulk_header, std::move(data));
std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
prepare.clear();
}
void es_objects_plugin_impl::PrepareLimit(const limit_order_object& limit_object,
const fc::time_point_sec& block_time, const uint32_t& block_number)
void es_objects_plugin_impl::prepare_limit(const limit_order_object& limit_object)
{
limit_order_struct limit;
limit.object_id = limit_object.id;
@ -300,27 +322,22 @@ void es_objects_plugin_impl::PrepareLimit(const limit_order_object& limit_object
limit.sell_price = limit_object.sell_price;
limit.deferred_fee = limit_object.deferred_fee;
auto it = limit_orders.find(limit_object.id);
if(it == limit_orders.end())
limit_orders[limit_object.id] = limit;
else {
if(it->second == limit) return;
else limit_orders[limit_object.id] = limit;
}
std::string data = fc::json::to_string(limit);
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = _es_objects_index_prefix + "limitorder";
bulk_header["_type"] = "data";
if(_es_objects_keep_only_current)
{
bulk_header["_id"] = string(limit.object_id);
}
prepare = graphene::utilities::createBulk(bulk_header, data);
bulk.insert(bulk.end(), prepare.begin(), prepare.end());
prepare = graphene::utilities::createBulk(bulk_header, std::move(data));
std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
prepare.clear();
}
void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object& bitasset_object,
const fc::time_point_sec& block_time, const uint32_t& block_number)
void es_objects_plugin_impl::prepare_bitasset(const asset_bitasset_data_object& bitasset_object)
{
if(!bitasset_object.is_prediction_market) {
@ -331,22 +348,18 @@ void es_objects_plugin_impl::PrepareBitAsset(const asset_bitasset_data_object& b
bitasset.current_feed = fc::json::to_string(bitasset_object.current_feed);
bitasset.current_feed_publication_time = bitasset_object.current_feed_publication_time;
auto it = bitassets.find(bitasset_object.id);
if(it == bitassets.end())
bitassets[bitasset_object.id] = bitasset;
else {
if(it->second == bitasset) return;
else bitassets[bitasset_object.id] = bitasset;
}
std::string data = fc::json::to_string(bitasset);
fc::mutable_variant_object bulk_header;
bulk_header["_index"] = _es_objects_index_prefix + "bitasset";
bulk_header["_type"] = "data";
if(_es_objects_keep_only_current)
{
bulk_header["_id"] = string(bitasset.object_id);
}
prepare = graphene::utilities::createBulk(bulk_header, data);
bulk.insert(bulk.end(), prepare.begin(), prepare.end());
prepare = graphene::utilities::createBulk(bulk_header, std::move(data));
std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk));
prepare.clear();
}
}
@ -382,17 +395,18 @@ void es_objects_plugin::plugin_set_program_options(
)
{
cli.add_options()
("es-objects-elasticsearch-url", boost::program_options::value<std::string>(), "Elasticsearch node url")
("es-objects-auth", boost::program_options::value<std::string>(), "Basic auth username:password")
("es-objects-bulk-replay", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on replay(5000)")
("es-objects-bulk-sync", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on a syncronied chain(10)")
("es-objects-proposals", boost::program_options::value<bool>(), "Store proposal objects")
("es-objects-accounts", boost::program_options::value<bool>(), "Store account objects")
("es-objects-assets", boost::program_options::value<bool>(), "Store asset objects")
("es-objects-balances", boost::program_options::value<bool>(), "Store balances objects")
("es-objects-limit-orders", boost::program_options::value<bool>(), "Store limit order objects")
("es-objects-asset-bitasset", boost::program_options::value<bool>(), "Store feed data")
("es-objects-elasticsearch-url", boost::program_options::value<std::string>(), "Elasticsearch node url(http://localhost:9200/)")
("es-objects-auth", boost::program_options::value<std::string>(), "Basic auth username:password('')")
("es-objects-bulk-replay", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on replay(10000)")
("es-objects-bulk-sync", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on a synchronized chain(100)")
("es-objects-proposals", boost::program_options::value<bool>(), "Store proposal objects(true)")
("es-objects-accounts", boost::program_options::value<bool>(), "Store account objects(true)")
("es-objects-assets", boost::program_options::value<bool>(), "Store asset objects(true)")
("es-objects-balances", boost::program_options::value<bool>(), "Store balances objects(true)")
("es-objects-limit-orders", boost::program_options::value<bool>(), "Store limit order objects(true)")
("es-objects-asset-bitasset", boost::program_options::value<bool>(), "Store feed data(true)")
("es-objects-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(objects-)")
("es-objects-keep-only-current", boost::program_options::value<bool>(), "Keep only current state of the objects(true)")
;
cfg.add(cli);
}
@ -400,17 +414,25 @@ void es_objects_plugin::plugin_set_program_options(
void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
database().new_objects.connect([&]( const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts ) {
if(!my->updateDatabase(ids, 1))
if(!my->index_database(ids, "create"))
{
FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying.");
FC_THROW_EXCEPTION(fc::exception, "Error creating object from ES database, we are going to keep trying.");
}
});
database().changed_objects.connect([&]( const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts ) {
if(!my->updateDatabase(ids, 0))
if(!my->index_database(ids, "update"))
{
FC_THROW_EXCEPTION(fc::exception, "Error populating ES database, we are going to keep trying.");
FC_THROW_EXCEPTION(fc::exception, "Error updating object from ES database, we are going to keep trying.");
}
});
database().removed_objects.connect([this](const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts) {
if(!my->index_database(ids, "delete"))
{
FC_THROW_EXCEPTION(fc::exception, "Error deleting object from ES database, we are going to keep trying.");
}
});
if (options.count("es-objects-elasticsearch-url")) {
my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as<std::string>();
}
@ -444,6 +466,9 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable
if (options.count("es-objects-index-prefix")) {
my->_es_objects_index_prefix = options["es-objects-index-prefix"].as<std::string>();
}
if (options.count("es-objects-keep-only-current")) {
my->_es_objects_keep_only_current = options["es-objects-keep-only-current"].as<bool>();
}
}
void es_objects_plugin::plugin_startup()

View file

@ -102,6 +102,7 @@ struct account_struct {
string active_key_auths;
string active_address_auths;
account_id_type voting_account;
string votes;
friend bool operator==(const account_struct& l, const account_struct& r)
{
@ -109,11 +110,11 @@ struct account_struct {
l.lifetime_referrer, l.network_fee_percentage, l.lifetime_referrer_fee_percentage,
l.referrer_rewards_percentage, l.name, l.owner_account_auths, l.owner_key_auths,
l.owner_address_auths, l.active_account_auths, l.active_key_auths, l.active_address_auths,
l.voting_account) == std::tie(r.object_id, r.block_time, r.block_number, r.membership_expiration_date, r.registrar, r.referrer,
l.voting_account, l.votes) == std::tie(r.object_id, r.block_time, r.block_number, r.membership_expiration_date, r.registrar, r.referrer,
r.lifetime_referrer, r.network_fee_percentage, r.lifetime_referrer_fee_percentage,
r.referrer_rewards_percentage, r.name, r.owner_account_auths, r.owner_key_auths,
r.owner_address_auths, r.active_account_auths, r.active_key_auths, r.active_address_auths,
r.voting_account);
r.voting_account, r.votes);
}
friend bool operator!=(const account_struct& l, const account_struct& r)
{
@ -146,14 +147,14 @@ struct balance_struct {
object_id_type object_id;
fc::time_point_sec block_time;
uint32_t block_number;
address owner;
asset_id_type asset_id;
share_type amount;
account_id_type owner;
asset_id_type asset_type;
share_type balance;
friend bool operator==(const balance_struct& l, const balance_struct& r)
{
return std::tie(l.object_id, l.block_time, l.block_number, l.block_time, l.owner, l.asset_id, l.amount)
== std::tie(r.object_id, r.block_time, r.block_number, r.block_time, r.owner, r.asset_id, r.amount);
return std::tie(l.object_id, l.block_time, l.block_number, l.block_time, l.owner, l.asset_type, l.balance)
== std::tie(r.object_id, r.block_time, r.block_number, r.block_time, r.owner, r.asset_type, r.balance);
}
friend bool operator!=(const balance_struct& l, const balance_struct& r)
{
@ -201,9 +202,30 @@ struct bitasset_struct {
} } //graphene::es_objects
FC_REFLECT( graphene::es_objects::proposal_struct, (object_id)(block_time)(block_number)(expiration_time)(review_period_time)(proposed_transaction)(required_active_approvals)(available_active_approvals)(required_owner_approvals)(available_owner_approvals)(available_key_approvals)(proposer) )
FC_REFLECT( graphene::es_objects::account_struct, (object_id)(block_time)(block_number)(membership_expiration_date)(registrar)(referrer)(lifetime_referrer)(network_fee_percentage)(lifetime_referrer_fee_percentage)(referrer_rewards_percentage)(name)(owner_account_auths)(owner_key_auths)(owner_address_auths)(active_account_auths)(active_key_auths)(active_address_auths)(voting_account) )
FC_REFLECT( graphene::es_objects::asset_struct, (object_id)(block_time)(block_number)(symbol)(issuer)(is_market_issued)(dynamic_asset_data_id)(bitasset_data_id) )
FC_REFLECT( graphene::es_objects::balance_struct, (object_id)(block_time)(block_number)(block_time)(owner)(asset_id)(amount) )
FC_REFLECT( graphene::es_objects::limit_order_struct, (object_id)(block_time)(block_number)(expiration)(seller)(for_sale)(sell_price)(deferred_fee) )
FC_REFLECT( graphene::es_objects::bitasset_struct, (object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time) )
FC_REFLECT(
graphene::es_objects::proposal_struct,
(object_id)(block_time)(block_number)(expiration_time)(review_period_time)(proposed_transaction)(required_active_approvals)
(available_active_approvals)(required_owner_approvals)(available_owner_approvals)(available_key_approvals)(proposer)
)
FC_REFLECT(
graphene::es_objects::account_struct,
(object_id)(block_time)(block_number)(membership_expiration_date)(registrar)(referrer)(lifetime_referrer)
(network_fee_percentage)(lifetime_referrer_fee_percentage)(referrer_rewards_percentage)(name)(owner_account_auths)
(owner_key_auths)(owner_address_auths)(active_account_auths)(active_key_auths)(active_address_auths)(voting_account)(votes)
)
FC_REFLECT(
graphene::es_objects::asset_struct,
(object_id)(block_time)(block_number)(symbol)(issuer)(is_market_issued)(dynamic_asset_data_id)(bitasset_data_id)
)
FC_REFLECT(
graphene::es_objects::balance_struct,
(object_id)(block_time)(block_number)(owner)(asset_type)(balance)
)
FC_REFLECT(
graphene::es_objects::limit_order_struct,
(object_id)(block_time)(block_number)(expiration)(seller)(for_sale)(sell_price)(deferred_fee)
)
FC_REFLECT(
graphene::es_objects::bitasset_struct,
(object_id)(block_time)(block_number)(current_feed)(current_feed_publication_time)
)