* increase delay for node connection
* remove cache from cli get_account
* add cli tests framework
* Adjust newly merged code to new API
* Merged changes from Bitshares PR 1036
* GRPH-76 - Short-cut long sequences of missed blocks
Fixes database::update_global_dynamic_data to speed up counting missed blocks.
(This also fixes a minor issue with counting - the previous algorithm would skip missed blocks for the witness who signed the first block after the gap.)
* Improved resilience of block database against corruption
* Moved reindex logic into database / chain_database, make use of additional blocks in block_database
Fixed tests wrt db.open
* Enable undo + fork database for final blocks in a replay
Dont remove blocks from block db when popping blocks, handle edge case in replay wrt fork_db, adapted unit tests
* Log starting block number of replay
* Prevent unsigned integer underflow
* Fixed lock detection
* Dont leave _data_dir empty if db is locked
* Writing the object_database is now almost atomic
* Improved consistency check for block_log
* Cut back block_log index file if inconsistent
* Fixed undo_database
* Added test case for broken merge on empty undo_db
* exclude second undo_db.enable() call in some cases
* Add missing change
* change bitshares to core in message
* Merge pull request #938 from bitshares/fix-block-storing
Store correct block ID when switching forks
* Fixed integer overflow issue
* Fix for for history ID mismatch ( Bitshares PR #875 )
* Update the FC submodule with the changes for GRPH-4
* Merged Bitshares PR #1462 and compilation fixes
* Support/gitlab (#123)
* Updated gitlab process
* Fix undefined references in cli test
* Updated GitLab CI
* Fix #436 object_database created outside of witness data directory
* supplement more comments on database::_opened variable
* prevent segfault when destructing application obj
* Fixed test failures and compilation issue
* minor performance improvement
* Added comment
* Fix compilation in debug mode
* Fixed duplicate ops returned from get_account_history
* Fixed account_history_pagination test
* Removed unrelated comment
* Update to fixed version of fc
* Skip auth check when pushing self-generated blocks
* Extract public keys before pushing a transaction
* Dereference chain_database shared_ptr
* Updated transaction::signees to mutable
and
* updated get_signature_keys() to return a const reference,
* get_signature_keys() will update signees on first call,
* modified test cases and wallet.cpp accordingly,
* no longer construct a new signed_transaction object before pushing
* Added get_asset_count API
* No longer extract public keys before pushing a trx
and removed unused new added constructor and _get_signature_keys() function from signed_transaction struct
* changes to withdraw_vesting feature(for both cdd and GPOS)
* Comments update
* update to GPOS hardfork ref
* Remove leftover comment from merge
* fix for get_vesting_balance API call
* braces update
* Allow sufficient space for new undo_session
* Throw for deep nesting
* node.cpp: Check the attacker/buggy client before updating items ids
The peer is an attacker or buggy, which means the item_hashes_received is
not correct.
Move the check before updating items ids to save some time in this case.
* Create .gitlab-ci.yml
* Added cli_test to CI
* fixing build errors (#150)
* fixing build errors
vest type correction
* fixing build errors
vest type correction
* fixes
new Dockerfile
* vesting_balance_type correction
vesting_balance_type changed to normal
* gcc5 support to Dockerfile
gcc5 support to Dockerfile
* use random port numbers in app_test (#154)
* Changes to compiple with GCC 7(Ubuntu 18.04)
* proposal fail_reason bug fixed (#157)
* Added Sonarcloud code_quality to CI (#159)
* Added sonarcloud analysis (#158)
* changes to have separate methods and single withdrawl fee for multiple vest objects
* 163-fix, Return only non-zero vesting balances
* Support/gitlab develop (#168)
* Added code_quality to CI
* Update .gitlab-ci.yml
* Point to PBSA/peerplays-fc commit f13d063 (#167)
* [GRPH-3] Additional cli tests (#155)
* Additional cli tests
* Compatible with latest fc changes
* Fixed Spacing issues
* [GRPH-106] Added voting tests (#136)
* Added more voting tests
* Added additional option
* Adjust p2p log level (#180)
* Added submodule sync to peerplays compile process
* merge gpos to develop (#186)
* issue - 154: Don't allow to vote when vesting balance is 0
* changes to withdraw_vesting feature(for both cdd and GPOS)
* Comments update
* update to GPOS hardfork ref
* fix for get_vesting_balance API call
* braces update
* Create .gitlab-ci.yml
* fixing build errors (#150)
* fixing build errors
vest type correction
* fixing build errors
vest type correction
* fixes
new Dockerfile
* vesting_balance_type correction
vesting_balance_type changed to normal
* gcc5 support to Dockerfile
gcc5 support to Dockerfile
* Changes to compiple with GCC 7(Ubuntu 18.04)
* changes to have separate methods and single withdrawl fee for multiple vest objects
* 163-fix, Return only non-zero vesting balances
* Revert "Revert "GPOS protocol""
This reverts commit 67616417b7.
* add new line needed to gpos hardfork file
* comment temporally cli_vote_for_2_witnesses until refactor or delete
* fix gpos tests
* fix gitlab-ci conflict
* Fixed few error messages
* error message corrections at other places
* Updated FC repository to peerplays-network/peerplays-fc (#189)
Point to fc commit hash 6096e94 [latest-fc branch]
* Project name update in Doxyfile (#146)
* changes to allow user to vote in each sub-period
* Fixed GPOS vesting factor issue when proxy is set
* Added unit test for proxy voting
* Review changes
* changes to update last voting time
* resolve merge conflict
* unit test changes and also separated GPOS test suite
* delete unused variables
* removed witness check
* eliminate time gap between two consecutive vesting periods
* deleted GPOS specific test suite and updated gpos tests
* updated GPOS hf
* Fixed dividend distribution issue and added test case
* fix flag
* clean newlines gpos_tests
* adapt gpos_tests to changed flag
* Fix to roll in GPOS rules, carry votes from 6th sub-period
* check was already modified
* comments updated
* updated comments to the benefit of reviewer
* Added token symbol name in error messages
* Added token symbol name in error messages (#204)
* case 1: Fixed last voting time issue
* get_account bug fixed
* Fixed flag issue
* Fixed spelling issue
* remove non needed gcc5 changes to dockerfile
* GRPH134- High CPU Issue, websocket changes (#213)
* update submodule branch to refer to the latest commit on latest-fc branch (#214)
* Improve account maintenance performance (#130)
* Improve account maintenance performance
* merge fixes
* Fixed merge issue
* Fixed indentations and extra ';'
* Update CI for syncing gitmodules (#216)
* Added logging for the old update_expired_feeds bug
The old bug is https://github.com/cryptonomex/graphene/issues/615 .
Due to the bug, `update_median_feeds()` and `check_call_orders()`
will be called when a feed is not actually expired, normally this
should not affect consensus since calling them should not change
any data in the state.
However, the logging indicates that `check_call_orders()` did
change some data under certain circumstances, specifically, when
multiple limit order matching issue (#453) occurred at same block.
* https://github.com/bitshares/bitshares-core/issues/453
* Minor performance improvement for price::is_null()
* Use static refs in db_getter for immutable objects
* Minor performance improvement for db_maint
* Minor code updates for asset_evaluator.cpp
* changed an `assert()` to `FC_ASSERT()`
* replaced one `db.get(asset_id_type())` with `db.get_core_asset()`
* capture only required variables for lambda
* Improve update_expired_feeds performance #1093
* Change static refs to member pointers of db class
* Added getter for witness schedule object
* Added getter for core dynamic data object
* Use getters
* Removed unused variable
* Add comments for update_expired_feeds in db_block
* Minor refactory asset_create_evaluator::do_apply()
* Added FC_ASSERT for dynamic data id of core asset
* Added header inclusions in db_management.cpp
* fix global objects usage during replay
* Logging config parsing issue
* added new files
* compilation fix
* Simplified code in database::pay_workers()
* issue with withdrawl
* Added unit test for empty account history
* set extensions default values
* Update GPOS hardfork date and don't allow GPOS features before hardfork time
* refer to latest commit of latest-fc branch (#224)
* account name or id support in all database api
* asset id or name support in all asset APIs
* Fixed compilation issues
* Fixed alignment issues
* Externalized some API templates
* Externalize serialization of blocks, tx, ops
* Externalized db objects
* Externalized genesis serialization
* Externalized serialization in protocol library
* Undo superfluous change
* remove default value for extension parameter
* fix compilation issues
* GRPH-46-Quit_command_cliwallet
* removed multiple function definition
* Fixed chainparameter update proposal issue
* Move GPOS withdraw logic to have single transaction(also single fee) and update API
* Added log for authorization failure of proposal operations
* Votes consideration on GPOS activation
* bump fc version
* fix gpos tests
* Bump fc version
* Updated gpos/voting_tests
* Fixed withdraw vesting bug
* Added unit test
* Update hardfork date for TESTNET, sync fc module and update logs
* avoid wlog as it filling up space
* Beatrice hot fix(sync issue fix)
* gpos tests fix
* Set hardfork date to Jan5th on TESTNET
* Merge Elasticplugin, snapshot plugin and graphene updates to beatrice (#304)
* check witness signature before adding block to fork db
* Replace verify_no_send_in_progress with no_parallel_execution_guard
* fixed cli_wallet log issue
* Port plugin sanitization code
* avoid directly overwriting wallet file
* Implemented "plugins" config variable
* allow plugin to have descriptions
* Merge pull request #444 from oxarbitrage/elasticsearch
Elasticsearch plugin
* Merge pull request #500 from oxarbitrage/elasticsearch-extras
es_objects plugin
* Merge pull request #873 from pmconrad/585_fix_history_ids
Fix history ids
* Merge pull request #1201 from oxarbitrage/elasticsearch_tests2
Elasticsearch refactor
* Merge pull request #1271 from oxarbitrage/es_objects
refine es_objects plugin
* Merge pull request #1429 from oxarbitrage/es_objects_templates
Add an adaptor to es_objects and template function to reduce code
* Merge pull request #1458 from oxarbitrage/issue1455
add option elasticsearch-start-es-after-block to es plugin
* Merge pull request #1541 from oxarbitrage/es_objects_start_after_block
add es-objects-start-es-after-block option
* explicitly cleanup external library facilities
* Merge pull request #1717 from oxarbitrage/issue1652
add genesis data to es_objects
* Merge pull request #1073 from xiangxn/merge-impacted
merge impacted into db_notify
* Merge pull request #1725 from oxarbitrage/issue1682
elasticsearch history api #1682
* change ES index prefixes to Peerplays-specific
* sync develop with beatrice
* fix the data writing to ES during sync issues
* fix CLI tests
* brought updates from mainnet branch (#285)
* Fix unit test failures (#289)
* fixed unit test failures from the recent merges
* fixed unit test failures from the recent merges
* enable snapshot plugin (#288)
* sync fc branch(build optimization changes)
* update to es plugin
* fix verify witness signature method (#295)
* enable mandatory plugins to have smooth transition for next release
* updated tests to keep in-line with plugin changes
Co-authored-by: Sandip Patel <sandip@knackroot.com>
Co-authored-by: Peter Conrad <conrad@quisquis.de>
Co-authored-by: Alfredo <oxarbitrage@gmail.com>
Co-authored-by: Abit <abitmore@users.noreply.github.com>
Co-authored-by: crypto-ape <43807588+crypto-ape@users.noreply.github.com>
Co-authored-by: gladcow <s.gladkov@pbsa.info>
* sync latest fc commit on beatrice
* sweeps winner_ticket_id changes
Co-authored-by: Bobinson K B <bobinson@gmail.com>
Co-authored-by: gladcow <s.gladkov@pbsa.info>
Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
Co-authored-by: Miha Čančula <miha@noughmad.eu>
Co-authored-by: Ronak Patel <r.patel@pbsa.info>
Co-authored-by: Srdjan Obucina <obucinac@gmail.com>
Co-authored-by: Peter Conrad <conrad@quisquis.de>
Co-authored-by: Peter Conrad <cyrano@quisquis.de>
Co-authored-by: Abit <abitmore@users.noreply.github.com>
Co-authored-by: Roshan Syed <r.syed@pbsa.info>
Co-authored-by: cifer <maintianyu@gmail.com>
Co-authored-by: John Jones <jmjatlanta@gmail.com>
Co-authored-by: Sandip Patel <sandip@knackroot.com>
Co-authored-by: Wei Yang <richard.weiyang@gmail.com>
Co-authored-by: gladcow <jahr@yandex.ru>
Co-authored-by: satyakoneru <satyakoneru.iiith@gmail.com>
Co-authored-by: crypto-ape <43807588+crypto-ape@users.noreply.github.com>
622 lines
23 KiB
C++
622 lines
23 KiB
C++
/*
|
|
* Copyright (c) 2017 Cryptonomex, Inc., and contributors.
|
|
*
|
|
* The MIT License
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to deal
|
|
* in the Software without restriction, including without limitation the rights
|
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
* copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in
|
|
* all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
* THE SOFTWARE.
|
|
*/
|
|
|
|
#include <graphene/elasticsearch/elasticsearch_plugin.hpp>
|
|
#include <graphene/chain/impacted.hpp>
|
|
#include <graphene/chain/account_evaluator.hpp>
|
|
#include <fc/smart_ref_impl.hpp>
|
|
#include <curl/curl.h>
|
|
|
|
namespace graphene { namespace elasticsearch {
|
|
|
|
namespace detail
|
|
{
|
|
|
|
class elasticsearch_plugin_impl
|
|
{
|
|
public:
|
|
elasticsearch_plugin_impl(elasticsearch_plugin& _plugin)
|
|
: _self( _plugin )
|
|
{ curl = curl_easy_init(); }
|
|
virtual ~elasticsearch_plugin_impl();
|
|
|
|
bool update_account_histories( const signed_block& b );
|
|
|
|
graphene::chain::database& database()
|
|
{
|
|
return _self.database();
|
|
}
|
|
|
|
elasticsearch_plugin& _self;
|
|
primary_index< operation_history_index >* _oho_index;
|
|
|
|
std::string _elasticsearch_node_url = "http://localhost:9200/";
|
|
uint32_t _elasticsearch_bulk_replay = 10000;
|
|
uint32_t _elasticsearch_bulk_sync = 100;
|
|
bool _elasticsearch_visitor = false;
|
|
std::string _elasticsearch_basic_auth = "";
|
|
std::string _elasticsearch_index_prefix = "peerplays-";
|
|
bool _elasticsearch_operation_object = false;
|
|
uint32_t _elasticsearch_start_es_after_block = 0;
|
|
bool _elasticsearch_operation_string = true;
|
|
mode _elasticsearch_mode = mode::only_save;
|
|
CURL *curl; // curl handler
|
|
vector <string> bulk_lines; // vector of op lines
|
|
vector<std::string> prepare;
|
|
|
|
graphene::utilities::ES es;
|
|
uint32_t limit_documents;
|
|
int16_t op_type;
|
|
operation_history_struct os;
|
|
block_struct bs;
|
|
visitor_struct vs;
|
|
bulk_struct bulk_line_struct;
|
|
std::string bulk_line;
|
|
std::string index_name;
|
|
bool is_sync = false;
|
|
fc::time_point last_sync;
|
|
private:
|
|
bool add_elasticsearch( const account_id_type account_id, const optional<operation_history_object>& oho, const uint32_t block_number );
|
|
const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj,
|
|
const account_id_type account_id,
|
|
const optional <operation_history_object>& oho);
|
|
const account_statistics_object& getStatsObject(const account_id_type account_id);
|
|
void growStats(const account_statistics_object& stats_obj, const account_transaction_history_object& ath);
|
|
void getOperationType(const optional <operation_history_object>& oho);
|
|
void doOperationHistory(const optional <operation_history_object>& oho);
|
|
void doBlock(const optional <operation_history_object>& oho, const signed_block& b);
|
|
void doVisitor(const optional <operation_history_object>& oho);
|
|
void checkState(const fc::time_point_sec& block_time);
|
|
void cleanObjects(const account_transaction_history_object& ath, account_id_type account_id);
|
|
void createBulkLine(const account_transaction_history_object& ath);
|
|
void prepareBulk(const account_transaction_history_id_type& ath_id);
|
|
void populateESstruct();
|
|
};
|
|
|
|
elasticsearch_plugin_impl::~elasticsearch_plugin_impl()
|
|
{
|
|
if (curl) {
|
|
curl_easy_cleanup(curl);
|
|
curl = nullptr;
|
|
}
|
|
return;
|
|
}
|
|
|
|
bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b )
|
|
{
|
|
checkState(b.timestamp);
|
|
index_name = graphene::utilities::generateIndexName(b.timestamp, _elasticsearch_index_prefix);
|
|
|
|
graphene::chain::database& db = database();
|
|
const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
|
|
bool is_first = true;
|
|
auto skip_oho_id = [&is_first,&db,this]() {
|
|
if( is_first && db._undo_db.enabled() ) // this ensures that the current id is rolled back on undo
|
|
{
|
|
db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );
|
|
is_first = false;
|
|
}
|
|
else
|
|
_oho_index->use_next_id();
|
|
};
|
|
for( const optional< operation_history_object >& o_op : hist ) {
|
|
optional <operation_history_object> oho;
|
|
|
|
auto create_oho = [&]() {
|
|
is_first = false;
|
|
return optional<operation_history_object>(
|
|
db.create<operation_history_object>([&](operation_history_object &h) {
|
|
if (o_op.valid())
|
|
{
|
|
h.op = o_op->op;
|
|
h.result = o_op->result;
|
|
h.block_num = o_op->block_num;
|
|
h.trx_in_block = o_op->trx_in_block;
|
|
h.op_in_trx = o_op->op_in_trx;
|
|
h.virtual_op = o_op->virtual_op;
|
|
}
|
|
}));
|
|
};
|
|
|
|
if( !o_op.valid() ) {
|
|
skip_oho_id();
|
|
continue;
|
|
}
|
|
oho = create_oho();
|
|
|
|
// populate what we can before impacted loop
|
|
getOperationType(oho);
|
|
doOperationHistory(oho);
|
|
doBlock(oho, b);
|
|
if(_elasticsearch_visitor)
|
|
doVisitor(oho);
|
|
|
|
const operation_history_object& op = *o_op;
|
|
|
|
// get the set of accounts this operation applies to
|
|
flat_set<account_id_type> impacted;
|
|
vector<authority> other;
|
|
operation_get_required_authorities( op.op, impacted, impacted, other ); // fee_payer is added here
|
|
|
|
if( op.op.which() == operation::tag< account_create_operation >::value )
|
|
impacted.insert( op.result.get<object_id_type>() );
|
|
else
|
|
graphene::chain::operation_get_impacted_accounts( op.op, impacted );
|
|
|
|
for( auto& a : other )
|
|
for( auto& item : a.account_auths )
|
|
impacted.insert( item.first );
|
|
|
|
for( auto& account_id : impacted )
|
|
{
|
|
if(!add_elasticsearch( account_id, oho, b.block_num() ))
|
|
return false;
|
|
}
|
|
}
|
|
// we send bulk at end of block when we are in sync for better real time client experience
|
|
if(is_sync)
|
|
{
|
|
populateESstruct();
|
|
if(es.bulk_lines.size() > 0)
|
|
{
|
|
prepare.clear();
|
|
if(!graphene::utilities::SendBulk(es))
|
|
return false;
|
|
else
|
|
bulk_lines.clear();
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::checkState(const fc::time_point_sec& block_time)
|
|
{
|
|
fc::time_point current_time(fc::time_point::now());
|
|
if(((current_time - block_time) < fc::seconds(30)) || (current_time - last_sync > fc::seconds(60)))
|
|
{
|
|
limit_documents = _elasticsearch_bulk_sync;
|
|
is_sync = true;
|
|
last_sync = current_time;
|
|
}
|
|
else
|
|
{
|
|
limit_documents = _elasticsearch_bulk_replay;
|
|
is_sync = false;
|
|
}
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::getOperationType(const optional <operation_history_object>& oho)
|
|
{
|
|
if (!oho->id.is_null())
|
|
op_type = oho->op.which();
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::doOperationHistory(const optional <operation_history_object>& oho)
|
|
{
|
|
os.trx_in_block = oho->trx_in_block;
|
|
os.op_in_trx = oho->op_in_trx;
|
|
os.operation_result = fc::json::to_string(oho->result);
|
|
os.virtual_op = oho->virtual_op;
|
|
|
|
if(_elasticsearch_operation_object) {
|
|
oho->op.visit(fc::from_static_variant(os.op_object, FC_PACK_MAX_DEPTH));
|
|
adaptor_struct adaptor;
|
|
os.op_object = adaptor.adapt(os.op_object.get_object());
|
|
}
|
|
if(_elasticsearch_operation_string)
|
|
os.op = fc::json::to_string(oho->op);
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::doBlock(const optional <operation_history_object>& oho, const signed_block& b)
|
|
{
|
|
std::string trx_id = "";
|
|
if(oho->trx_in_block < b.transactions.size())
|
|
trx_id = b.transactions[oho->trx_in_block].id().str();
|
|
bs.block_num = b.block_num();
|
|
bs.block_time = b.timestamp;
|
|
bs.trx_id = trx_id;
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_object>& oho)
|
|
{
|
|
operation_visitor o_v;
|
|
oho->op.visit(o_v);
|
|
|
|
vs.fee_data.asset = o_v.fee_asset;
|
|
vs.fee_data.amount = o_v.fee_amount;
|
|
|
|
vs.transfer_data.asset = o_v.transfer_asset_id;
|
|
vs.transfer_data.amount = o_v.transfer_amount;
|
|
vs.transfer_data.from = o_v.transfer_from;
|
|
vs.transfer_data.to = o_v.transfer_to;
|
|
|
|
vs.fill_data.order_id = o_v.fill_order_id;
|
|
vs.fill_data.account_id = o_v.fill_account_id;
|
|
vs.fill_data.pays_asset_id = o_v.fill_pays_asset_id;
|
|
vs.fill_data.pays_amount = o_v.fill_pays_amount;
|
|
vs.fill_data.receives_asset_id = o_v.fill_receives_asset_id;
|
|
vs.fill_data.receives_amount = o_v.fill_receives_amount;
|
|
//vs.fill_data.fill_price = o_v.fill_fill_price;
|
|
//vs.fill_data.is_maker = o_v.fill_is_maker;
|
|
}
|
|
|
|
bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account_id,
|
|
const optional <operation_history_object>& oho,
|
|
const uint32_t block_number)
|
|
{
|
|
const auto &stats_obj = getStatsObject(account_id);
|
|
const auto &ath = addNewEntry(stats_obj, account_id, oho);
|
|
growStats(stats_obj, ath);
|
|
if(block_number > _elasticsearch_start_es_after_block) {
|
|
createBulkLine(ath);
|
|
prepareBulk(ath.id);
|
|
}
|
|
cleanObjects(ath, account_id);
|
|
|
|
if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech
|
|
prepare.clear();
|
|
populateESstruct();
|
|
if(!graphene::utilities::SendBulk(es))
|
|
return false;
|
|
else
|
|
bulk_lines.clear();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
const account_statistics_object& elasticsearch_plugin_impl::getStatsObject(const account_id_type account_id)
|
|
{
|
|
graphene::chain::database& db = database();
|
|
const auto &acct = db.get<account_object>(account_id);
|
|
return acct.statistics(db);
|
|
}
|
|
|
|
const account_transaction_history_object& elasticsearch_plugin_impl::addNewEntry(const account_statistics_object& stats_obj,
|
|
const account_id_type account_id,
|
|
const optional <operation_history_object>& oho)
|
|
{
|
|
graphene::chain::database& db = database();
|
|
const auto &ath = db.create<account_transaction_history_object>([&](account_transaction_history_object &obj) {
|
|
obj.operation_id = oho->id;
|
|
obj.account = account_id;
|
|
obj.sequence = stats_obj.total_ops + 1;
|
|
obj.next = stats_obj.most_recent_op;
|
|
});
|
|
|
|
return ath;
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::growStats(const account_statistics_object& stats_obj,
|
|
const account_transaction_history_object& ath)
|
|
{
|
|
graphene::chain::database& db = database();
|
|
db.modify(stats_obj, [&](account_statistics_object &obj) {
|
|
obj.most_recent_op = ath.id;
|
|
obj.total_ops = ath.sequence;
|
|
});
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::createBulkLine(const account_transaction_history_object& ath)
|
|
{
|
|
bulk_line_struct.account_history = ath;
|
|
bulk_line_struct.operation_history = os;
|
|
bulk_line_struct.operation_type = op_type;
|
|
bulk_line_struct.operation_id_num = ath.operation_id.instance.value;
|
|
bulk_line_struct.block_data = bs;
|
|
if(_elasticsearch_visitor)
|
|
bulk_line_struct.additional_data = vs;
|
|
bulk_line = fc::json::to_string(bulk_line_struct);
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::prepareBulk(const account_transaction_history_id_type& ath_id)
|
|
{
|
|
const std::string _id = fc::json::to_string(ath_id);
|
|
fc::mutable_variant_object bulk_header;
|
|
bulk_header["_index"] = index_name;
|
|
bulk_header["_type"] = "data";
|
|
bulk_header["_id"] = fc::to_string(ath_id.space_id) + "." + fc::to_string(ath_id.type_id) + "." + ath_id.instance;
|
|
prepare = graphene::utilities::createBulk(bulk_header, bulk_line);
|
|
bulk_lines.insert(bulk_lines.end(), prepare.begin(), prepare.end());
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::cleanObjects(const account_transaction_history_object& ath, account_id_type account_id)
|
|
{
|
|
graphene::chain::database& db = database();
|
|
// remove everything except current object from ath
|
|
const auto &his_idx = db.get_index_type<account_transaction_history_index>();
|
|
const auto &by_seq_idx = his_idx.indices().get<by_seq>();
|
|
auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0));
|
|
if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath.id) {
|
|
// if found, remove the entry
|
|
const auto remove_op_id = itr->operation_id;
|
|
const auto itr_remove = itr;
|
|
++itr;
|
|
db.remove( *itr_remove );
|
|
// modify previous node's next pointer
|
|
// this should be always true, but just have a check here
|
|
if( itr != by_seq_idx.end() && itr->account == account_id )
|
|
{
|
|
db.modify( *itr, [&]( account_transaction_history_object& obj ){
|
|
obj.next = account_transaction_history_id_type();
|
|
});
|
|
}
|
|
// do the same on oho
|
|
const auto &by_opid_idx = his_idx.indices().get<by_opid>();
|
|
if (by_opid_idx.find(remove_op_id) == by_opid_idx.end()) {
|
|
db.remove(remove_op_id(db));
|
|
}
|
|
}
|
|
}
|
|
|
|
void elasticsearch_plugin_impl::populateESstruct()
|
|
{
|
|
es.curl = curl;
|
|
es.bulk_lines = bulk_lines;
|
|
es.elasticsearch_url = _elasticsearch_node_url;
|
|
es.auth = _elasticsearch_basic_auth;
|
|
}
|
|
|
|
} // end namespace detail
|
|
|
|
elasticsearch_plugin::elasticsearch_plugin() :
|
|
my( new detail::elasticsearch_plugin_impl(*this) )
|
|
{
|
|
}
|
|
|
|
elasticsearch_plugin::~elasticsearch_plugin()
|
|
{
|
|
}
|
|
|
|
std::string elasticsearch_plugin::plugin_name()const
|
|
{
|
|
return "elasticsearch";
|
|
}
|
|
std::string elasticsearch_plugin::plugin_description()const
|
|
{
|
|
return "Stores account history data in elasticsearch database(EXPERIMENTAL).";
|
|
}
|
|
|
|
void elasticsearch_plugin::plugin_set_program_options(
|
|
boost::program_options::options_description& cli,
|
|
boost::program_options::options_description& cfg
|
|
)
|
|
{
|
|
cli.add_options()
|
|
("elasticsearch-node-url", boost::program_options::value<std::string>(),
|
|
"Elastic Search database node url(http://localhost:9200/)")
|
|
("elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(),
|
|
"Number of bulk documents to index on replay(10000)")
|
|
("elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(),
|
|
"Number of bulk documents to index on a syncronied chain(100)")
|
|
("elasticsearch-visitor", boost::program_options::value<bool>(),
|
|
"Use visitor to index additional data(slows down the replay(false))")
|
|
("elasticsearch-basic-auth", boost::program_options::value<std::string>(),
|
|
"Pass basic auth to elasticsearch database('')")
|
|
("elasticsearch-index-prefix", boost::program_options::value<std::string>(),
|
|
"Add a prefix to the index(peerplays-)")
|
|
("elasticsearch-operation-object", boost::program_options::value<bool>(),
|
|
"Save operation as object(false)")
|
|
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
|
|
"Start doing ES job after block(0)")
|
|
("elasticsearch-operation-string", boost::program_options::value<bool>(),
|
|
"Save operation as string. Needed to serve history api calls(true)")
|
|
("elasticsearch-mode", boost::program_options::value<uint16_t>(),
|
|
"Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
|
|
;
|
|
cfg.add(cli);
|
|
}
|
|
|
|
void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
|
|
{
|
|
my->_oho_index = database().add_index< primary_index< operation_history_index > >();
|
|
database().add_index< primary_index< account_transaction_history_index > >();
|
|
|
|
if (options.count("elasticsearch-node-url")) {
|
|
my->_elasticsearch_node_url = options["elasticsearch-node-url"].as<std::string>();
|
|
}
|
|
if (options.count("elasticsearch-bulk-replay")) {
|
|
my->_elasticsearch_bulk_replay = options["elasticsearch-bulk-replay"].as<uint32_t>();
|
|
}
|
|
if (options.count("elasticsearch-bulk-sync")) {
|
|
my->_elasticsearch_bulk_sync = options["elasticsearch-bulk-sync"].as<uint32_t>();
|
|
}
|
|
if (options.count("elasticsearch-visitor")) {
|
|
my->_elasticsearch_visitor = options["elasticsearch-visitor"].as<bool>();
|
|
}
|
|
if (options.count("elasticsearch-basic-auth")) {
|
|
my->_elasticsearch_basic_auth = options["elasticsearch-basic-auth"].as<std::string>();
|
|
}
|
|
if (options.count("elasticsearch-index-prefix")) {
|
|
my->_elasticsearch_index_prefix = options["elasticsearch-index-prefix"].as<std::string>();
|
|
}
|
|
if (options.count("elasticsearch-operation-object")) {
|
|
my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as<bool>();
|
|
}
|
|
if (options.count("elasticsearch-start-es-after-block")) {
|
|
my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
|
|
}
|
|
if (options.count("elasticsearch-operation-string")) {
|
|
my->_elasticsearch_operation_string = options["elasticsearch-operation-string"].as<bool>();
|
|
}
|
|
if (options.count("elasticsearch-mode")) {
|
|
const auto option_number = options["elasticsearch-mode"].as<uint16_t>();
|
|
if(option_number > mode::all)
|
|
FC_THROW_EXCEPTION(fc::exception, "Elasticsearch mode not valid");
|
|
my->_elasticsearch_mode = static_cast<mode>(options["elasticsearch-mode"].as<uint16_t>());
|
|
}
|
|
|
|
if(my->_elasticsearch_mode != mode::only_query) {
|
|
if (my->_elasticsearch_mode == mode::all && !my->_elasticsearch_operation_string)
|
|
FC_THROW_EXCEPTION(fc::exception,
|
|
"If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");
|
|
|
|
database().applied_block.connect([this](const signed_block &b) {
|
|
if (!my->update_account_histories(b))
|
|
FC_THROW_EXCEPTION(fc::exception,
|
|
"Error populating ES database, we are going to keep trying.");
|
|
});
|
|
}
|
|
}
|
|
|
|
void elasticsearch_plugin::plugin_startup()
|
|
{
|
|
graphene::utilities::ES es;
|
|
es.curl = my->curl;
|
|
es.elasticsearch_url = my->_elasticsearch_node_url;
|
|
es.auth = my->_elasticsearch_basic_auth;
|
|
|
|
if(!graphene::utilities::checkES(es))
|
|
FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_elasticsearch_node_url));
|
|
ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin");
|
|
}
|
|
|
|
operation_history_object elasticsearch_plugin::get_operation_by_id(operation_history_id_type id)
|
|
{
|
|
const string operation_id_string = std::string(object_id_type(id));
|
|
|
|
const string query = R"(
|
|
{
|
|
"query": {
|
|
"match":
|
|
{
|
|
"account_history.operation_id": )" + operation_id_string + R"("
|
|
}
|
|
}
|
|
}
|
|
)";
|
|
|
|
auto es = prepareHistoryQuery(query);
|
|
const auto response = graphene::utilities::simpleQuery(es);
|
|
variant variant_response = fc::json::from_string(response);
|
|
const auto source = variant_response["hits"]["hits"][size_t(0)]["_source"];
|
|
return fromEStoOperation(source);
|
|
}
|
|
|
|
vector<operation_history_object> elasticsearch_plugin::get_account_history(
|
|
const account_id_type account_id,
|
|
operation_history_id_type stop = operation_history_id_type(),
|
|
unsigned limit = 100,
|
|
operation_history_id_type start = operation_history_id_type())
|
|
{
|
|
const string account_id_string = std::string(object_id_type(account_id));
|
|
|
|
const auto stop_number = stop.instance.value;
|
|
const auto start_number = start.instance.value;
|
|
|
|
string range = "";
|
|
if(stop_number == 0)
|
|
range = " AND operation_id_num: ["+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
|
|
else if(stop_number > 0)
|
|
range = " AND operation_id_num: {"+fc::to_string(stop_number)+" TO "+fc::to_string(start_number)+"]";
|
|
|
|
const string query = R"(
|
|
{
|
|
"size": )" + fc::to_string(limit) + R"(,
|
|
"sort" : [{ "operation_id_num" : {"order" : "desc"}}],
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{
|
|
"query_string": {
|
|
"query": "account_history.account: )" + account_id_string + range + R"("
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
)";
|
|
|
|
auto es = prepareHistoryQuery(query);
|
|
|
|
vector<operation_history_object> result;
|
|
|
|
if(!graphene::utilities::checkES(es))
|
|
return result;
|
|
|
|
const auto response = graphene::utilities::simpleQuery(es);
|
|
variant variant_response = fc::json::from_string(response);
|
|
|
|
const auto hits = variant_response["hits"]["total"]["value"];
|
|
uint32_t size;
|
|
if( hits.is_object() ) // ES-7 ?
|
|
size = static_cast<uint32_t>(hits["value"].as_uint64());
|
|
else // probably ES-6
|
|
size = static_cast<uint32_t>(hits.as_uint64());
|
|
|
|
size = std::min( size, limit );
|
|
|
|
for(unsigned i=0; i<size; i++)
|
|
{
|
|
const auto source = variant_response["hits"]["hits"][size_t(i)]["_source"];
|
|
result.push_back(fromEStoOperation(source));
|
|
}
|
|
return result;
|
|
}
|
|
|
|
operation_history_object elasticsearch_plugin::fromEStoOperation(variant source)
|
|
{
|
|
operation_history_object result;
|
|
|
|
const auto operation_id = source["account_history"]["operation_id"];
|
|
fc::from_variant( operation_id, result.id, GRAPHENE_MAX_NESTED_OBJECTS );
|
|
|
|
const auto op = fc::json::from_string(source["operation_history"]["op"].as_string());
|
|
fc::from_variant( op, result.op, GRAPHENE_MAX_NESTED_OBJECTS );
|
|
|
|
const auto operation_result = fc::json::from_string(source["operation_history"]["operation_result"].as_string());
|
|
fc::from_variant( operation_result, result.result, GRAPHENE_MAX_NESTED_OBJECTS );
|
|
|
|
result.block_num = source["block_data"]["block_num"].as_uint64();
|
|
result.trx_in_block = source["operation_history"]["trx_in_block"].as_uint64();
|
|
result.op_in_trx = source["operation_history"]["op_in_trx"].as_uint64();
|
|
result.trx_in_block = source["operation_history"]["virtual_op"].as_uint64();
|
|
|
|
return result;
|
|
}
|
|
|
|
graphene::utilities::ES elasticsearch_plugin::prepareHistoryQuery(string query)
|
|
{
|
|
CURL *curl;
|
|
curl = curl_easy_init();
|
|
|
|
graphene::utilities::ES es;
|
|
es.curl = curl;
|
|
es.elasticsearch_url = my->_elasticsearch_node_url;
|
|
es.index_prefix = my->_elasticsearch_index_prefix;
|
|
es.endpoint = es.index_prefix + "*/data/_search";
|
|
es.query = query;
|
|
|
|
return es;
|
|
}
|
|
|
|
mode elasticsearch_plugin::get_running_mode()
|
|
{
|
|
return my->_elasticsearch_mode;
|
|
}
|
|
|
|
|
|
} }
|