Compare commits
5 commits
master
...
feature/es
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
515dc52199 | ||
|
|
d36491c747 | ||
|
|
ed36462290 | ||
|
|
a64e002b52 | ||
|
|
94f420e852 |
6 changed files with 537 additions and 79 deletions
|
|
@ -22,6 +22,34 @@ endif()
|
|||
|
||||
list( APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMakeModules" )
|
||||
|
||||
# function to help with cUrl
|
||||
macro(FIND_CURL)
|
||||
if (NOT WIN32 AND NOT APPLE AND CURL_STATICLIB)
|
||||
find_package(OpenSSL REQUIRED)
|
||||
set (OLD_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
|
||||
set (CMAKE_FIND_LIBRARY_SUFFIXES .a)
|
||||
find_package(CURL REQUIRED)
|
||||
list(APPEND CURL_LIBRARIES ${OPENSSL_LIBRARIES} ${BOOST_THREAD_LIBRARY} ${CMAKE_DL_LIBS})
|
||||
set (CMAKE_FIND_LIBRARY_SUFFIXES ${OLD_SUFFIXES})
|
||||
else (NOT WIN32 AND NOT APPLE AND CURL_STATICLIB)
|
||||
find_package(CURL REQUIRED)
|
||||
endif (NOT WIN32 AND NOT APPLE AND CURL_STATICLIB)
|
||||
|
||||
if( WIN32 )
|
||||
if ( MSVC )
|
||||
list( APPEND CURL_LIBRARIES Wldap32 )
|
||||
endif( MSVC )
|
||||
|
||||
if( MINGW )
|
||||
# MinGW requires a specific order of included libraries ( CURL before ZLib )
|
||||
find_package( ZLIB REQUIRED )
|
||||
list( APPEND CURL_LIBRARIES ${ZLIB_LIBRARY} pthread )
|
||||
endif( MINGW )
|
||||
|
||||
list( APPEND CURL_LIBRARIES ${PLATFORM_SPECIFIC_LIBS} )
|
||||
endif( WIN32 )
|
||||
endmacro()
|
||||
|
||||
set(CMAKE_EXPORT_COMPILE_COMMANDS "ON")
|
||||
set(GRAPHENE_EGENESIS_JSON "${CMAKE_CURRENT_SOURCE_DIR}/genesis.json" CACHE PATH "location of the genesis.json to embed in the executable" )
|
||||
|
||||
|
|
|
|||
|
|
@ -4,13 +4,21 @@ add_library( graphene_elasticsearch
|
|||
elasticsearch_plugin.cpp
|
||||
)
|
||||
|
||||
target_link_libraries( graphene_elasticsearch PRIVATE graphene_plugin curl )
|
||||
target_include_directories( graphene_elasticsearch
|
||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" )
|
||||
find_curl()
|
||||
|
||||
include_directories(${CURL_INCLUDE_DIRS})
|
||||
if(MSVC)
|
||||
set_source_files_properties(elasticsearch_plugin.cpp PROPERTIES COMPILE_FLAGS "/bigobj" )
|
||||
endif(MSVC)
|
||||
if(CURL_STATICLIB)
|
||||
SET_TARGET_PROPERTIES(graphene_elasticsearch PROPERTIES
|
||||
COMPILE_DEFINITIONS "CURL_STATICLIB")
|
||||
endif(CURL_STATICLIB)
|
||||
target_link_libraries( graphene_elasticsearch PRIVATE graphene_plugin ${CURL_LIBRARIES} )
|
||||
target_include_directories( graphene_elasticsearch
|
||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include"
|
||||
PUBLIC "${CURL_INCLUDE_DIR}" )
|
||||
|
||||
|
||||
install( TARGETS
|
||||
graphene_elasticsearch
|
||||
|
|
|
|||
|
|
@ -57,9 +57,9 @@ class elasticsearch_plugin_impl
|
|||
bool _elasticsearch_visitor = false;
|
||||
std::string _elasticsearch_basic_auth = "";
|
||||
std::string _elasticsearch_index_prefix = "peerplays-";
|
||||
bool _elasticsearch_operation_object = false;
|
||||
bool _elasticsearch_operation_object = true;
|
||||
uint32_t _elasticsearch_start_es_after_block = 0;
|
||||
bool _elasticsearch_operation_string = true;
|
||||
bool _elasticsearch_operation_string = false;
|
||||
mode _elasticsearch_mode = mode::only_save;
|
||||
CURL *curl; // curl handler
|
||||
vector <string> bulk_lines; // vector of op lines
|
||||
|
|
@ -75,20 +75,19 @@ class elasticsearch_plugin_impl
|
|||
std::string bulk_line;
|
||||
std::string index_name;
|
||||
bool is_sync = false;
|
||||
fc::time_point last_sync;
|
||||
private:
|
||||
bool add_elasticsearch( const account_id_type account_id, const optional<operation_history_object>& oho, const uint32_t block_number );
|
||||
const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj,
|
||||
const account_id_type account_id,
|
||||
const account_id_type& account_id,
|
||||
const optional <operation_history_object>& oho);
|
||||
const account_statistics_object& getStatsObject(const account_id_type account_id);
|
||||
const account_statistics_object& getStatsObject(const account_id_type& account_id);
|
||||
void growStats(const account_statistics_object& stats_obj, const account_transaction_history_object& ath);
|
||||
void getOperationType(const optional <operation_history_object>& oho);
|
||||
void doOperationHistory(const optional <operation_history_object>& oho);
|
||||
void doBlock(const optional <operation_history_object>& oho, const signed_block& b);
|
||||
void doBlock(uint32_t trx_in_block, const signed_block& b);
|
||||
void doVisitor(const optional <operation_history_object>& oho);
|
||||
void checkState(const fc::time_point_sec& block_time);
|
||||
void cleanObjects(const account_transaction_history_object& ath, account_id_type account_id);
|
||||
void cleanObjects(const account_transaction_history_id_type& ath, const account_id_type& account_id);
|
||||
void createBulkLine(const account_transaction_history_object& ath);
|
||||
void prepareBulk(const account_transaction_history_id_type& ath_id);
|
||||
void populateESstruct();
|
||||
|
|
@ -148,7 +147,7 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b
|
|||
// populate what we can before impacted loop
|
||||
getOperationType(oho);
|
||||
doOperationHistory(oho);
|
||||
doBlock(oho, b);
|
||||
doBlock(oho->trx_in_block, b);
|
||||
if(_elasticsearch_visitor)
|
||||
doVisitor(oho);
|
||||
|
||||
|
|
@ -174,7 +173,11 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b
|
|||
for( auto& account_id : impacted )
|
||||
{
|
||||
if(!add_elasticsearch( account_id, oho, b.block_num() ))
|
||||
{
|
||||
elog( "Error adding data to Elastic Search: block num ${b}, account ${a}, data ${d}",
|
||||
("b",b.block_num()) ("a",account_id) ("d", oho) );
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
// we send bulk at end of block when we are in sync for better real time client experience
|
||||
|
|
@ -185,23 +188,33 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b
|
|||
{
|
||||
prepare.clear();
|
||||
if(!graphene::utilities::SendBulk(es))
|
||||
{
|
||||
// Note: although called with `std::move()`, `es` is not updated in `SendBulk()`
|
||||
elog( "Error sending ${n} lines of bulk data to Elastic Search, the first lines are:",
|
||||
("n",es.bulk_lines.size()) );
|
||||
for( size_t i = 0; i < es.bulk_lines.size() && i < 10; ++i )
|
||||
{
|
||||
edump( (es.bulk_lines[i]) );
|
||||
}
|
||||
return false;
|
||||
}
|
||||
else
|
||||
bulk_lines.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if(bulk_lines.size() != limit_documents)
|
||||
bulk_lines.reserve(limit_documents);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void elasticsearch_plugin_impl::checkState(const fc::time_point_sec& block_time)
|
||||
{
|
||||
fc::time_point current_time(fc::time_point::now());
|
||||
if(((current_time - block_time) < fc::seconds(30)) || (current_time - last_sync > fc::seconds(60)))
|
||||
if((fc::time_point::now() - block_time) < fc::seconds(30))
|
||||
{
|
||||
limit_documents = _elasticsearch_bulk_sync;
|
||||
is_sync = true;
|
||||
last_sync = current_time;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -232,11 +245,11 @@ void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_his
|
|||
os.op = fc::json::to_string(oho->op);
|
||||
}
|
||||
|
||||
void elasticsearch_plugin_impl::doBlock(const optional <operation_history_object>& oho, const signed_block& b)
|
||||
void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block, const signed_block& b)
|
||||
{
|
||||
std::string trx_id = "";
|
||||
if(oho->trx_in_block < b.transactions.size())
|
||||
trx_id = b.transactions[oho->trx_in_block].id().str();
|
||||
if(trx_in_block < b.transactions.size())
|
||||
trx_id = b.transactions[trx_in_block].id().str();
|
||||
bs.block_num = b.block_num();
|
||||
bs.block_time = b.timestamp;
|
||||
bs.trx_id = trx_id;
|
||||
|
|
@ -244,23 +257,41 @@ void elasticsearch_plugin_impl::doBlock(const optional <operation_history_object
|
|||
|
||||
void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_object>& oho)
|
||||
{
|
||||
graphene::chain::database& db = database();
|
||||
|
||||
operation_visitor o_v;
|
||||
oho->op.visit(o_v);
|
||||
|
||||
auto fee_asset = o_v.fee_asset(db);
|
||||
vs.fee_data.asset = o_v.fee_asset;
|
||||
vs.fee_data.asset_name = fee_asset.symbol;
|
||||
vs.fee_data.amount = o_v.fee_amount;
|
||||
vs.fee_data.amount_units = (o_v.fee_amount.value)/(double)asset::scaled_precision(fee_asset.precision).value;
|
||||
|
||||
auto transfer_asset = o_v.transfer_asset_id(db);
|
||||
vs.transfer_data.asset = o_v.transfer_asset_id;
|
||||
vs.transfer_data.asset_name = transfer_asset.symbol;
|
||||
vs.transfer_data.amount = o_v.transfer_amount;
|
||||
vs.transfer_data.amount_units = (o_v.transfer_amount.value)/(double)asset::scaled_precision(transfer_asset.precision).value;
|
||||
vs.transfer_data.from = o_v.transfer_from;
|
||||
vs.transfer_data.to = o_v.transfer_to;
|
||||
|
||||
auto fill_pays_asset = o_v.fill_pays_asset_id(db);
|
||||
auto fill_receives_asset = o_v.fill_receives_asset_id(db);
|
||||
vs.fill_data.order_id = o_v.fill_order_id;
|
||||
vs.fill_data.account_id = o_v.fill_account_id;
|
||||
vs.fill_data.pays_asset_id = o_v.fill_pays_asset_id;
|
||||
vs.fill_data.pays_asset_name = fill_pays_asset.symbol;
|
||||
vs.fill_data.pays_amount = o_v.fill_pays_amount;
|
||||
vs.fill_data.pays_amount_units = (o_v.fill_pays_amount.value)/(double)asset::scaled_precision(fill_pays_asset.precision).value;
|
||||
vs.fill_data.receives_asset_id = o_v.fill_receives_asset_id;
|
||||
vs.fill_data.receives_asset_name = fill_receives_asset.symbol;
|
||||
vs.fill_data.receives_amount = o_v.fill_receives_amount;
|
||||
vs.fill_data.receives_amount_units = (o_v.fill_receives_amount.value)/(double)asset::scaled_precision(fill_receives_asset.precision).value;
|
||||
|
||||
auto fill_price = (o_v.fill_receives_amount.value/(double)asset::scaled_precision(fill_receives_asset.precision).value) /
|
||||
(o_v.fill_pays_amount.value/(double)asset::scaled_precision(fill_pays_asset.precision).value);
|
||||
vs.fill_data.fill_price_units = fill_price;
|
||||
//vs.fill_data.fill_price = o_v.fill_fill_price;
|
||||
//vs.fill_data.is_maker = o_v.fill_is_maker;
|
||||
}
|
||||
|
|
@ -276,13 +307,22 @@ bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account
|
|||
createBulkLine(ath);
|
||||
prepareBulk(ath.id);
|
||||
}
|
||||
cleanObjects(ath, account_id);
|
||||
cleanObjects(ath.id, account_id);
|
||||
|
||||
if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech
|
||||
prepare.clear();
|
||||
populateESstruct();
|
||||
if(!graphene::utilities::SendBulk(es))
|
||||
{
|
||||
// Note: although called with `std::move()`, `es` is not updated in `SendBulk()`
|
||||
elog( "Error sending ${n} lines of bulk data to Elastic Search, the first lines are:",
|
||||
("n",es.bulk_lines.size()) );
|
||||
for( size_t i = 0; i < es.bulk_lines.size() && i < 10; ++i )
|
||||
{
|
||||
edump( (es.bulk_lines[i]) );
|
||||
}
|
||||
return false;
|
||||
}
|
||||
else
|
||||
bulk_lines.clear();
|
||||
}
|
||||
|
|
@ -290,15 +330,16 @@ bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account
|
|||
return true;
|
||||
}
|
||||
|
||||
const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const account_id_type account_id)
|
||||
const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const account_id_type& account_id)
|
||||
{
|
||||
graphene::chain::database& db = database();
|
||||
const auto &acct = db.get<account_object>(account_id);
|
||||
return acct.statistics(db);
|
||||
const auto &stats_obj = db.get_account_stats_by_owner(account_id);
|
||||
|
||||
return stats_obj;
|
||||
}
|
||||
|
||||
const account_transaction_history_object& elasticsearch_plugin_impl::addNewEntry(const account_statistics_object& stats_obj,
|
||||
const account_id_type account_id,
|
||||
const account_id_type& account_id,
|
||||
const optional <operation_history_object>& oho)
|
||||
{
|
||||
graphene::chain::database& db = database();
|
||||
|
|
@ -340,19 +381,21 @@ void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id
|
|||
fc::mutable_variant_object bulk_header;
|
||||
bulk_header["_index"] = index_name;
|
||||
bulk_header["_type"] = "data";
|
||||
bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "." + ath_id.instance;
|
||||
prepare = graphene::utilities::createBulk(bulk_header, bulk_line);
|
||||
bulk_lines.insert(bulk_lines.end(), prepare.begin(), prepare.end());
|
||||
bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "."
|
||||
+ fc::to_string(ath_id.instance.value);
|
||||
prepare = graphene::utilities::createBulk(bulk_header, std::move(bulk_line));
|
||||
std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines));
|
||||
prepare.clear();
|
||||
}
|
||||
|
||||
void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_object& ath, account_id_type account_id)
|
||||
void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_id_type& ath_id, const account_id_type& account_id)
|
||||
{
|
||||
graphene::chain::database& db = database();
|
||||
// remove everything except current object from ath
|
||||
const auto &his_idx = db.get_index_type<account_transaction_history_index>();
|
||||
const auto &by_seq_idx = his_idx.indices().get<by_seq>();
|
||||
auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0));
|
||||
if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath.id) {
|
||||
if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath_id) {
|
||||
// if found, remove the entry
|
||||
const auto remove_op_id = itr->operation_id;
|
||||
const auto itr_remove = itr;
|
||||
|
|
@ -377,9 +420,12 @@ void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_o
|
|||
void elasticsearch_plugin_impl::populateESstruct()
|
||||
{
|
||||
es.curl = curl;
|
||||
es.bulk_lines = bulk_lines;
|
||||
es.bulk_lines = std::move(bulk_lines);
|
||||
es.elasticsearch_url = _elasticsearch_node_url;
|
||||
es.auth = _elasticsearch_basic_auth;
|
||||
es.index_prefix = _elasticsearch_index_prefix;
|
||||
es.endpoint = "";
|
||||
es.query = "";
|
||||
}
|
||||
|
||||
} // end namespace detail
|
||||
|
|
@ -421,11 +467,11 @@ void elasticsearch_plugin::plugin_set_program_options(
|
|||
("elasticsearch-index-prefix", boost::program_options::value<std::string>(),
|
||||
"Add a prefix to the index(peerplays-)")
|
||||
("elasticsearch-operation-object", boost::program_options::value<bool>(),
|
||||
"Save operation as object(false)")
|
||||
"Save operation as object(true)")
|
||||
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
|
||||
"Start doing ES job after block(0)")
|
||||
("elasticsearch-operation-string", boost::program_options::value<bool>(),
|
||||
"Save operation as string. Needed to serve history api calls(true)")
|
||||
"Save operation as string. Needed to serve history api calls(false)")
|
||||
("elasticsearch-mode", boost::program_options::value<uint16_t>(),
|
||||
"Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
|
||||
;
|
||||
|
|
@ -467,18 +513,18 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
|
|||
if (options.count("elasticsearch-mode")) {
|
||||
const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
|
||||
if(option_number > mode::all)
|
||||
FC_THROW_EXCEPTION(fc::exception, "Elasticsearch mode not valid");
|
||||
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Elasticsearch mode not valid");
|
||||
my->_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
|
||||
}
|
||||
|
||||
if(my->_elasticsearch_mode != mode::only_query) {
|
||||
if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string)
|
||||
FC_THROW_EXCEPTION(fc::exception,
|
||||
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
|
||||
"If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");
|
||||
|
||||
database().applied_block.connect([this](const signed_block &b) {
|
||||
if (!my->update_account_histories(b))
|
||||
FC_THROW_EXCEPTION(fc::exception,
|
||||
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
|
||||
"Error populating ES database, we are going to keep trying.");
|
||||
});
|
||||
}
|
||||
|
|
@ -563,13 +609,12 @@ vector<operation_history_object> elasticsearch_plugin::get_account_history(
|
|||
const auto response = graphene::utilities::simpleQuery(es);
|
||||
variant variant_response = fc::json::from_string(response);
|
||||
|
||||
const auto hits = variant_response["hits"]["total"]["value"];
|
||||
const auto hits = variant_response["hits"]["total"];
|
||||
uint32_t size;
|
||||
if( hits.is_object() ) // ES-7 ?
|
||||
size = static_cast<uint32_t>(hits["value"].as_uint64());
|
||||
else // probably ES-6
|
||||
size = static_cast<uint32_t>(hits.as_uint64());
|
||||
|
||||
size = std::min( size, limit );
|
||||
|
||||
for(unsigned i=0; i<size; i++)
|
||||
|
|
|
|||
|
|
@ -152,12 +152,16 @@ struct block_struct {
|
|||
|
||||
struct fee_struct {
|
||||
asset_id_type asset;
|
||||
std::string asset_name;
|
||||
share_type amount;
|
||||
double amount_units;
|
||||
};
|
||||
|
||||
struct transfer_struct {
|
||||
asset_id_type asset;
|
||||
std::string asset_name;
|
||||
share_type amount;
|
||||
double amount_units;
|
||||
account_id_type from;
|
||||
account_id_type to;
|
||||
};
|
||||
|
|
@ -166,10 +170,15 @@ struct fill_struct {
|
|||
object_id_type order_id;
|
||||
account_id_type account_id;
|
||||
asset_id_type pays_asset_id;
|
||||
std::string pays_asset_name;
|
||||
share_type pays_amount;
|
||||
double pays_amount_units;
|
||||
asset_id_type receives_asset_id;
|
||||
std::string receives_asset_name;
|
||||
share_type receives_amount;
|
||||
double receives_amount_units;
|
||||
double fill_price;
|
||||
double fill_price_units;
|
||||
bool is_maker;
|
||||
};
|
||||
|
||||
|
|
@ -258,6 +267,23 @@ struct adaptor_struct {
|
|||
{
|
||||
o["initializer"] = fc::json::to_string(o["initializer"]);
|
||||
}
|
||||
if (o.find("policy") != o.end())
|
||||
{
|
||||
o["policy"] = fc::json::to_string(o["policy"]);
|
||||
}
|
||||
if (o.find("predicates") != o.end())
|
||||
{
|
||||
o["predicates"] = fc::json::to_string(o["predicates"]);
|
||||
}
|
||||
if (o.find("active_special_authority") != o.end())
|
||||
{
|
||||
o["active_special_authority"] = fc::json::to_string(o["active_special_authority"]);
|
||||
}
|
||||
if (o.find("owner_special_authority") != o.end())
|
||||
{
|
||||
o["owner_special_authority"] = fc::json::to_string(o["owner_special_authority"]);
|
||||
}
|
||||
|
||||
|
||||
variant v;
|
||||
fc::to_variant(o, v, FC_PACK_MAX_DEPTH);
|
||||
|
|
@ -277,13 +303,16 @@ struct adaptor_struct {
|
|||
}
|
||||
}
|
||||
};
|
||||
|
||||
} } //graphene::elasticsearch
|
||||
|
||||
FC_REFLECT_ENUM( graphene::elasticsearch::mode, (only_save)(only_query)(all) )
|
||||
FC_REFLECT( graphene::elasticsearch::operation_history_struct, (trx_in_block)(op_in_trx)(operation_result)(virtual_op)(op)(op_object) )
|
||||
FC_REFLECT( graphene::elasticsearch::block_struct, (block_num)(block_time)(trx_id) )
|
||||
FC_REFLECT( graphene::elasticsearch::fee_struct, (asset)(amount) )
|
||||
FC_REFLECT( graphene::elasticsearch::transfer_struct, (asset)(amount)(from)(to) )
|
||||
FC_REFLECT( graphene::elasticsearch::fill_struct, (order_id)(account_id)(pays_asset_id)(pays_amount)(receives_asset_id)(receives_amount)(fill_price)(is_maker))
|
||||
FC_REFLECT( graphene::elasticsearch::fee_struct, (asset)(asset_name)(amount)(amount_units) )
|
||||
FC_REFLECT( graphene::elasticsearch::transfer_struct, (asset)(asset_name)(amount)(amount_units)(from)(to) )
|
||||
FC_REFLECT( graphene::elasticsearch::fill_struct, (order_id)(account_id)(pays_asset_id)(pays_asset_name)(pays_amount)(pays_amount_units)
|
||||
(receives_asset_id)(receives_asset_name)(receives_amount)(receives_amount_units)(fill_price)
|
||||
(fill_price_units)(is_maker))
|
||||
FC_REFLECT( graphene::elasticsearch::visitor_struct, (fee_data)(transfer_data)(fill_data) )
|
||||
FC_REFLECT( graphene::elasticsearch::bulk_struct, (account_history)(operation_history)(operation_type)(operation_id_num)(block_data)(additional_data) )
|
||||
|
|
|
|||
|
|
@ -4,14 +4,22 @@ add_library( graphene_es_objects
|
|||
es_objects.cpp
|
||||
)
|
||||
|
||||
target_link_libraries( graphene_es_objects PRIVATE graphene_plugin curl )
|
||||
target_include_directories( graphene_es_objects
|
||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" )
|
||||
find_curl()
|
||||
|
||||
include_directories(${CURL_INCLUDE_DIRS})
|
||||
if(CURL_STATICLIB)
|
||||
SET_TARGET_PROPERTIES(graphene_es_objects PROPERTIES
|
||||
COMPILE_DEFINITIONS "CURL_STATICLIB")
|
||||
endif(CURL_STATICLIB)
|
||||
if(MSVC)
|
||||
set_source_files_properties(es_objects.cpp PROPERTIES COMPILE_FLAGS "/bigobj" )
|
||||
endif(MSVC)
|
||||
|
||||
target_link_libraries( graphene_es_objects PRIVATE graphene_plugin ${CURL_LIBRARIES} )
|
||||
target_include_directories( graphene_es_objects
|
||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" )
|
||||
|
||||
|
||||
install( TARGETS
|
||||
graphene_es_objects
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,22 @@
|
|||
#include <graphene/chain/asset_object.hpp>
|
||||
#include <graphene/chain/account_object.hpp>
|
||||
|
||||
#include <graphene/chain/account_role_object.hpp>
|
||||
#include <graphene/chain/committee_member_object.hpp>
|
||||
#include <graphene/chain/nft_object.hpp>
|
||||
#include <graphene/chain/offer_object.hpp>
|
||||
#include <graphene/chain/sidechain_address_object.hpp>
|
||||
#include <graphene/chain/sidechain_transaction_object.hpp>
|
||||
#include <graphene/chain/son_object.hpp>
|
||||
#include <graphene/chain/son_proposal_object.hpp>
|
||||
#include <graphene/chain/son_wallet_object.hpp>
|
||||
#include <graphene/chain/son_wallet_deposit_object.hpp>
|
||||
#include <graphene/chain/son_wallet_withdraw_object.hpp>
|
||||
#include <graphene/chain/transaction_object.hpp>
|
||||
#include <graphene/chain/vesting_balance_object.hpp>
|
||||
#include <graphene/chain/witness_object.hpp>
|
||||
#include <graphene/chain/worker_object.hpp>
|
||||
|
||||
#include <graphene/utilities/elasticsearch.hpp>
|
||||
|
||||
namespace graphene { namespace es_objects {
|
||||
|
|
@ -61,6 +77,16 @@ class es_objects_plugin_impl
|
|||
bool _es_objects_balances = true;
|
||||
bool _es_objects_limit_orders = true;
|
||||
bool _es_objects_asset_bitasset = true;
|
||||
|
||||
bool _es_objects_account_role = true;
|
||||
bool _es_objects_committee_member = true;
|
||||
bool _es_objects_nft = true;
|
||||
bool _es_objects_son = true;
|
||||
bool _es_objects_transaction = true;
|
||||
bool _es_objects_vesting_balance = true;
|
||||
bool _es_objects_witness = true;
|
||||
bool _es_objects_worker = true;
|
||||
|
||||
std::string _es_objects_index_prefix = "ppobjects-";
|
||||
uint32_t _es_objects_start_es_after_block = 0;
|
||||
CURL *curl; // curl handler
|
||||
|
|
@ -79,7 +105,6 @@ class es_objects_plugin_impl
|
|||
|
||||
bool es_objects_plugin_impl::genesis()
|
||||
{
|
||||
|
||||
ilog("elasticsearch OBJECTS: inserting data from genesis");
|
||||
|
||||
graphene::chain::database &db = _self.database();
|
||||
|
|
@ -112,13 +137,142 @@ bool es_objects_plugin_impl::genesis()
|
|||
});
|
||||
}
|
||||
|
||||
if (_es_objects_account_role) {
|
||||
auto &idx = db.get_index_type<graphene::chain::account_role_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const account_role_object *>(obj);
|
||||
prepareTemplate<account_role_object>(*b, "account_role");
|
||||
});
|
||||
}
|
||||
if (_es_objects_committee_member) {
|
||||
auto &idx = db.get_index_type<graphene::chain::committee_member_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const committee_member_object *>(obj);
|
||||
prepareTemplate<committee_member_object>(*b, "committee_member");
|
||||
});
|
||||
}
|
||||
if (_es_objects_nft) {
|
||||
auto &idx = db.get_index_type<graphene::chain::nft_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const nft_object *>(obj);
|
||||
prepareTemplate<nft_object>(*b, "nft");
|
||||
});
|
||||
}
|
||||
if (_es_objects_nft) {
|
||||
auto &idx = db.get_index_type<graphene::chain::nft_metadata_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const nft_metadata_object *>(obj);
|
||||
prepareTemplate<nft_metadata_object>(*b, "nft_metadata");
|
||||
});
|
||||
}
|
||||
if (_es_objects_nft) {
|
||||
auto &idx = db.get_index_type<graphene::chain::offer_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const offer_object *>(obj);
|
||||
prepareTemplate<offer_object>(*b, "offer");
|
||||
});
|
||||
}
|
||||
if (_es_objects_son) {
|
||||
auto &idx = db.get_index_type<graphene::chain::sidechain_address_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const sidechain_address_object *>(obj);
|
||||
prepareTemplate<sidechain_address_object>(*b, "sidechain_address");
|
||||
});
|
||||
}
|
||||
if (_es_objects_son) {
|
||||
auto &idx = db.get_index_type<graphene::chain::sidechain_transaction_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const sidechain_transaction_object *>(obj);
|
||||
prepareTemplate<sidechain_transaction_object>(*b, "sidechain_transaction");
|
||||
});
|
||||
}
|
||||
if (_es_objects_son) {
|
||||
auto &idx = db.get_index_type<graphene::chain::son_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const son_object *>(obj);
|
||||
prepareTemplate<son_object>(*b, "son");
|
||||
});
|
||||
}
|
||||
if (_es_objects_son) {
|
||||
auto &idx = db.get_index_type<graphene::chain::son_proposal_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const son_proposal_object *>(obj);
|
||||
prepareTemplate<son_proposal_object>(*b, "son_proposal");
|
||||
});
|
||||
}
|
||||
if (_es_objects_son) {
|
||||
auto &idx = db.get_index_type<graphene::chain::son_wallet_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const son_wallet_object *>(obj);
|
||||
prepareTemplate<son_wallet_object>(*b, "son_wallet");
|
||||
});
|
||||
}
|
||||
if (_es_objects_son) {
|
||||
auto &idx = db.get_index_type<graphene::chain::son_wallet_deposit_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const son_wallet_deposit_object *>(obj);
|
||||
prepareTemplate<son_wallet_deposit_object>(*b, "son_wallet_deposit");
|
||||
});
|
||||
}
|
||||
if (_es_objects_son) {
|
||||
auto &idx = db.get_index_type<graphene::chain::son_wallet_withdraw_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const son_wallet_withdraw_object *>(obj);
|
||||
prepareTemplate<son_wallet_withdraw_object>(*b, "son_wallet_withdraw");
|
||||
});
|
||||
}
|
||||
if (_es_objects_transaction) {
|
||||
auto &idx = db.get_index_type<graphene::chain::transaction_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const transaction_object *>(obj);
|
||||
prepareTemplate<transaction_object>(*b, "transaction");
|
||||
});
|
||||
}
|
||||
if (_es_objects_vesting_balance) {
|
||||
auto &idx = db.get_index_type<graphene::chain::vesting_balance_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const vesting_balance_object *>(obj);
|
||||
prepareTemplate<vesting_balance_object>(*b, "vesting_balance");
|
||||
});
|
||||
}
|
||||
if (_es_objects_witness) {
|
||||
auto &idx = db.get_index_type<graphene::chain::witness_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const witness_object *>(obj);
|
||||
prepareTemplate<witness_object>(*b, "witness");
|
||||
});
|
||||
}
|
||||
if (_es_objects_worker) {
|
||||
auto &idx = db.get_index_type<graphene::chain::worker_index>();
|
||||
idx.inspect_all_objects([this, &db](const graphene::db::object &o) {
|
||||
auto obj = db.find_object(o.id);
|
||||
auto b = static_cast<const worker_object *>(obj);
|
||||
prepareTemplate<worker_object>(*b, "worker");
|
||||
});
|
||||
}
|
||||
|
||||
graphene::utilities::ES es;
|
||||
es.curl = curl;
|
||||
es.bulk_lines = bulk;
|
||||
es.elasticsearch_url = _es_objects_elasticsearch_url;
|
||||
es.auth = _es_objects_auth;
|
||||
if (!graphene::utilities::SendBulk(es))
|
||||
FC_THROW_EXCEPTION(fc::exception, "Error inserting genesis data.");
|
||||
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error inserting genesis data.");
|
||||
else
|
||||
bulk.clear();
|
||||
|
||||
|
|
@ -197,6 +351,150 @@ bool es_objects_plugin_impl::index_database(const vector<object_id_type>& ids, s
|
|||
else
|
||||
prepareTemplate<asset_bitasset_data_object>(*ba, "bitasset");
|
||||
}
|
||||
} else if (value.is<account_role_object>() && _es_objects_account_role) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const account_role_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "account_role");
|
||||
else
|
||||
prepareTemplate<account_role_object>(*ba, "account_role");
|
||||
}
|
||||
} else if (value.is<committee_member_object>() && _es_objects_committee_member) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const committee_member_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "committee_member");
|
||||
else
|
||||
prepareTemplate<committee_member_object>(*ba, "committee_member");
|
||||
}
|
||||
} else if (value.is<nft_object>() && _es_objects_nft) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const nft_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "nft");
|
||||
else
|
||||
prepareTemplate<nft_object>(*ba, "nft");
|
||||
}
|
||||
} else if (value.is<nft_metadata_object>() && _es_objects_nft) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const nft_metadata_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "nft_metadata");
|
||||
else
|
||||
prepareTemplate<nft_metadata_object>(*ba, "nft_metadata");
|
||||
}
|
||||
} else if (value.is<offer_object>() && _es_objects_nft) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const offer_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "offer");
|
||||
else
|
||||
prepareTemplate<offer_object>(*ba, "offer");
|
||||
}
|
||||
} else if (value.is<sidechain_address_object>() && _es_objects_son) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const sidechain_address_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "sidechain_address");
|
||||
else
|
||||
prepareTemplate<sidechain_address_object>(*ba, "sidechain_address");
|
||||
}
|
||||
} else if (value.is<sidechain_transaction_object>() && _es_objects_son) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const sidechain_transaction_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "sidechain_transaction");
|
||||
else
|
||||
prepareTemplate<sidechain_transaction_object>(*ba, "sidechain_transaction");
|
||||
}
|
||||
} else if (value.is<son_object>() && _es_objects_son) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const son_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "son");
|
||||
else
|
||||
prepareTemplate<son_object>(*ba, "son");
|
||||
}
|
||||
} else if (value.is<son_proposal_object>() && _es_objects_son) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const son_proposal_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "son_proposal");
|
||||
else
|
||||
prepareTemplate<son_proposal_object>(*ba, "son_proposal");
|
||||
}
|
||||
} else if (value.is<son_wallet_object>() && _es_objects_son) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const son_wallet_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "son_wallet");
|
||||
else
|
||||
prepareTemplate<son_wallet_object>(*ba, "son_wallet");
|
||||
}
|
||||
} else if (value.is<son_wallet_deposit_object>() && _es_objects_son) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const son_wallet_deposit_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "son_wallet_deposit");
|
||||
else
|
||||
prepareTemplate<son_wallet_deposit_object>(*ba, "son_wallet_deposit");
|
||||
}
|
||||
} else if (value.is<son_wallet_withdraw_object>() && _es_objects_son) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const son_wallet_withdraw_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "son_wallet_withdraw");
|
||||
else
|
||||
prepareTemplate<son_wallet_withdraw_object>(*ba, "son_wallet_withdraw");
|
||||
}
|
||||
} else if (value.is<transaction_object>() && _es_objects_transaction) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const transaction_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "transaction");
|
||||
else
|
||||
prepareTemplate<transaction_object>(*ba, "transaction");
|
||||
}
|
||||
} else if (value.is<vesting_balance_object>() && _es_objects_vesting_balance) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const vesting_balance_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "vesting_balance");
|
||||
else
|
||||
prepareTemplate<vesting_balance_object>(*ba, "vesting_balance");
|
||||
}
|
||||
} else if (value.is<witness_object>() && _es_objects_witness) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const witness_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "witness");
|
||||
else
|
||||
prepareTemplate<witness_object>(*ba, "witness");
|
||||
}
|
||||
} else if (value.is<worker_object>() && _es_objects_worker) {
|
||||
auto obj = db.find_object(value);
|
||||
auto ba = static_cast<const worker_object *>(obj);
|
||||
if (ba != nullptr) {
|
||||
if (action == "delete")
|
||||
remove_from_database(ba->id, "worker");
|
||||
else
|
||||
prepareTemplate<worker_object>(*ba, "worker");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -296,52 +594,39 @@ void es_objects_plugin::plugin_set_program_options(
|
|||
)
|
||||
{
|
||||
cli.add_options()
|
||||
("es-objects-elasticsearch-url", boost::program_options::value<std::string>(), "Elasticsearch node url(http://localhost:9200/)")
|
||||
("es-objects-elasticsearch-url", boost::program_options::value<std::string>(),
|
||||
"Elasticsearch node url(http://localhost:9200/)")
|
||||
("es-objects-auth", boost::program_options::value<std::string>(), "Basic auth username:password('')")
|
||||
("es-objects-bulk-replay", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on replay(10000)")
|
||||
("es-objects-bulk-sync", boost::program_options::value<uint32_t>(), "Number of bulk documents to index on a synchronized chain(100)")
|
||||
("es-objects-bulk-replay", boost::program_options::value<uint32_t>(),
|
||||
"Number of bulk documents to index on replay(10000)")
|
||||
("es-objects-bulk-sync", boost::program_options::value<uint32_t>(),
|
||||
"Number of bulk documents to index on a synchronized chain(100)")
|
||||
("es-objects-proposals", boost::program_options::value<bool>(), "Store proposal objects(true)")
|
||||
("es-objects-accounts", boost::program_options::value<bool>(), "Store account objects(true)")
|
||||
("es-objects-assets", boost::program_options::value<bool>(), "Store asset objects(true)")
|
||||
("es-objects-balances", boost::program_options::value<bool>(), "Store balances objects(true)")
|
||||
("es-objects-limit-orders", boost::program_options::value<bool>(), "Store limit order objects(true)")
|
||||
("es-objects-asset-bitasset", boost::program_options::value<bool>(), "Store feed data(true)")
|
||||
("es-objects-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(ppobjects-)")
|
||||
("es-objects-keep-only-current", boost::program_options::value<bool>(), "Keep only current state of the objects(true)")
|
||||
("es-objects-start-es-after-block", boost::program_options::value<uint32_t>(), "Start doing ES job after block(0)")
|
||||
("es-objects-limit-orders", boost::program_options::value<bool>(), "Store limit order objects(false)")
|
||||
("es-objects-bitasset", boost::program_options::value<bool>(), "Store feed data(true)")
|
||||
("es-objects-account-role", boost::program_options::value<bool>(), "Store account role objects (true)")
|
||||
("es-objects-committee-member", boost::program_options::value<bool>(), "Store committee member objects(true)")
|
||||
("es-objects-nft", boost::program_options::value<bool>(), "Store nft objects (true)")
|
||||
("es-objects-son", boost::program_options::value<bool>(), "Store son objects (true)")
|
||||
("es-objects-transaction", boost::program_options::value<bool>(), "Store transaction objects (true)")
|
||||
("es-objects-vesting-balance", boost::program_options::value<bool>(), "Store vesting balance objects (true)")
|
||||
("es-objects-witness", boost::program_options::value<bool>(), "Store witness objects (true)")
|
||||
("es-objects-worker", boost::program_options::value<bool>(), "Store worker objects (true)")
|
||||
("es-objects-index-prefix", boost::program_options::value<std::string>(),
|
||||
"Add a prefix to the index(ppobjects-)")
|
||||
("es-objects-keep-only-current", boost::program_options::value<bool>(),
|
||||
"Keep only current state of the objects(true)")
|
||||
("es-objects-start-es-after-block", boost::program_options::value<uint32_t>(),
|
||||
"Start doing ES job after block(0)")
|
||||
;
|
||||
cfg.add(cli);
|
||||
}
|
||||
|
||||
void es_objects_plugin::plugin_initialize(const boost::program_options::variables_map& options)
|
||||
{
|
||||
database().applied_block.connect([this](const signed_block &b) {
|
||||
if(b.block_num() == 1) {
|
||||
if (!my->genesis())
|
||||
FC_THROW_EXCEPTION(fc::exception, "Error populating genesis data.");
|
||||
}
|
||||
});
|
||||
|
||||
database().new_objects.connect([this]( const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts ) {
|
||||
if(!my->index_database(ids, "create"))
|
||||
{
|
||||
FC_THROW_EXCEPTION(fc::exception, "Error creating object from ES database, we are going to keep trying.");
|
||||
}
|
||||
});
|
||||
database().changed_objects.connect([this]( const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts ) {
|
||||
if(!my->index_database(ids, "update"))
|
||||
{
|
||||
FC_THROW_EXCEPTION(fc::exception, "Error updating object from ES database, we are going to keep trying.");
|
||||
}
|
||||
});
|
||||
database().removed_objects.connect([this](const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts) {
|
||||
if(!my->index_database(ids, "delete"))
|
||||
{
|
||||
FC_THROW_EXCEPTION(fc::exception, "Error deleting object from ES database, we are going to keep trying.");
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
if (options.count("es-objects-elasticsearch-url")) {
|
||||
my->_es_objects_elasticsearch_url = options["es-objects-elasticsearch-url"].as<std::string>();
|
||||
}
|
||||
|
|
@ -372,6 +657,30 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable
|
|||
if (options.count("es-objects-asset-bitasset")) {
|
||||
my->_es_objects_asset_bitasset = options["es-objects-asset-bitasset"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-account-role")) {
|
||||
my->_es_objects_balances = options["es-objects-account-role"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-committee-member")) {
|
||||
my->_es_objects_balances = options["es-objects-committee-member"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-nft")) {
|
||||
my->_es_objects_balances = options["es-objects-nft"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-son")) {
|
||||
my->_es_objects_balances = options["es-objects-son"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-transaction")) {
|
||||
my->_es_objects_balances = options["es-objects-transaction"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-vesting-balance")) {
|
||||
my->_es_objects_balances = options["es-objects-vesting-balance"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-witness")) {
|
||||
my->_es_objects_balances = options["es-objects-witness"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-worker")) {
|
||||
my->_es_objects_balances = options["es-objects-worker"].as<bool>();
|
||||
}
|
||||
if (options.count("es-objects-index-prefix")) {
|
||||
my->_es_objects_index_prefix = options["es-objects-index-prefix"].as<std::string>();
|
||||
}
|
||||
|
|
@ -381,6 +690,37 @@ void es_objects_plugin::plugin_initialize(const boost::program_options::variable
|
|||
if (options.count("es-objects-start-es-after-block")) {
|
||||
my->_es_objects_start_es_after_block = options["es-objects-start-es-after-block"].as<uint32_t>();
|
||||
}
|
||||
|
||||
database().applied_block.connect([this](const signed_block &b) {
|
||||
if(b.block_num() == 1 && my->_es_objects_start_es_after_block == 0) {
|
||||
if (!my->genesis())
|
||||
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating genesis data.");
|
||||
}
|
||||
});
|
||||
database().new_objects.connect([this]( const vector<object_id_type>& ids,
|
||||
const flat_set<account_id_type>& impacted_accounts ) {
|
||||
if(!my->index_database(ids, "create"))
|
||||
{
|
||||
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
|
||||
"Error creating object from ES database, we are going to keep trying.");
|
||||
}
|
||||
});
|
||||
database().changed_objects.connect([this]( const vector<object_id_type>& ids,
|
||||
const flat_set<account_id_type>& impacted_accounts ) {
|
||||
if(!my->index_database(ids, "update"))
|
||||
{
|
||||
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
|
||||
"Error updating object from ES database, we are going to keep trying.");
|
||||
}
|
||||
});
|
||||
database().removed_objects.connect([this](const vector<object_id_type>& ids,
|
||||
const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts) {
|
||||
if(!my->index_database(ids, "delete"))
|
||||
{
|
||||
FC_THROW_EXCEPTION(graphene::chain::plugin_exception,
|
||||
"Error deleting object from ES database, we are going to keep trying.");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void es_objects_plugin::plugin_startup()
|
||||
|
|
@ -396,4 +736,4 @@ void es_objects_plugin::plugin_startup()
|
|||
ilog("elasticsearch OBJECTS: plugin_startup() begin");
|
||||
}
|
||||
|
||||
} }
|
||||
} }
|
||||
Loading…
Reference in a new issue