Merge pull request #1717 from oxarbitrage/issue1652

add genesis data to es_objects
This commit is contained in:
Alfredo Garcia 2019-04-23 07:56:14 -03:00 committed by gladcow
parent 29ed4383b8
commit c6fef71398

View file

@ -48,8 +48,9 @@ class es_objects_plugin_impl
{ curl = curl_easy_init(); }
virtual ~es_objects_plugin_impl();
bool index_database( const vector<object_id_type>& ids, std::string action);
void remove_from_database( object_id_type id, std::string index);
bool index_database(const vector<object_id_type>& ids, std::string action);
bool genesis();
void remove_from_database(object_id_type id, std::string index);
es_objects_plugin& _self;
std::string _es_objects_elasticsearch_url = "http://localhost:9200/";
@ -78,7 +79,55 @@ class es_objects_plugin_impl
void prepareTemplate(T blockchain_object, string index_name);
};
bool es_objects_plugin_impl::index_database( const vector<object_id_type>& ids, std::string action)
bool es_objects_plugin_impl::genesis()
{
ilog("elasticsearch OBJECTS: inserting data from genesis");
graphene::chain::database &db = _self.database();
block_number = db.head_block_num();
block_time = db.head_block_time();
if (_es_objects_accounts) {
auto &index_accounts = db.get_index(1, 2);
index_accounts.inspect_all_objects([this, &db](const graphene::db::object &o) {
auto obj = db.find_object(o.id);
auto a = static_cast<const account_object *>(obj);
prepareTemplate<account_object>(*a, "account");
});
}
if (_es_objects_assets) {
auto &index_assets = db.get_index(1, 3);
index_assets.inspect_all_objects([this, &db](const graphene::db::object &o) {
auto obj = db.find_object(o.id);
auto a = static_cast<const asset_object *>(obj);
prepareTemplate<asset_object>(*a, "asset");
});
}
if (_es_objects_balances) {
auto &index_balances = db.get_index(2, 5);
index_balances.inspect_all_objects([this, &db](const graphene::db::object &o) {
auto obj = db.find_object(o.id);
auto b = static_cast<const account_balance_object *>(obj);
prepareTemplate<account_balance_object>(*b, "balance");
});
}
graphene::utilities::ES es;
es.curl = curl;
es.bulk_lines = bulk;
es.elasticsearch_url = _es_objects_elasticsearch_url;
es.auth = _es_objects_auth;
if (!graphene::utilities::SendBulk(es))
FC_THROW_EXCEPTION(fc::exception, "Error inserting genesis data.");
else
bulk.clear();
return true;
}
bool es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, std::string action)
{
graphene::chain::database &db = _self.database();
@ -268,13 +317,20 @@ void es_objects_plugin::plugin_set_program_options(
void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
database().new_objects.connect([&]( const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts ) {
database().applied_block.connect([this](const signed_block &b) {
if(b.block_num() == 1) {
if (!my->genesis())
FC_THROW_EXCEPTION(fc::exception, "Error populating genesis data.");
}
});
database().new_objects.connect([this]( const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts ) {
if(!my->index_database(ids, "create"))
{
FC_THROW_EXCEPTION(fc::exception, "Error creating object from ES database, we are going to keep trying.");
}
});
database().changed_objects.connect([&]( const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts ) {
database().changed_objects.connect([this]( const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts ) {
if(!my->index_database(ids, "update"))
{
FC_THROW_EXCEPTION(fc::exception, "Error updating object from ES database, we are going to keep trying.");