WIP, refactor SON processing

This commit is contained in:
Srdjan Obucina 2020-02-27 02:28:19 +01:00
parent 47c98c203a
commit e9a26de347
8 changed files with 149 additions and 143 deletions

View file

@ -461,7 +461,7 @@ void database::init_genesis(const genesis_state_type& genesis_state)
FC_ASSERT(create<account_object>([this](account_object& a) {
a.name = "son-account";
a.statistics = create<account_statistics_object>([&](account_statistics_object& s){s.owner = a.id;}).id;
a.owner.weight_threshold = 0;
a.owner.weight_threshold = 1;
a.active.weight_threshold = 0;
a.registrar = a.lifetime_referrer = a.referrer = GRAPHENE_SON_ACCOUNT;
a.membership_expiration_date = time_point_sec::maximum();

View file

@ -20,11 +20,11 @@ public:
std::vector<std::string> get_sidechain_withdraw_addresses();
void sidechain_event_data_received(const sidechain_event_data& sed);
void process_deposits();
void process_withdrawals();
virtual void recreate_primary_wallet() = 0;
virtual void process_deposits() = 0;
virtual void process_deposit(const son_wallet_deposit_object& swdo) = 0;
virtual void process_withdrawals() = 0;
virtual void process_withdrawal(const son_wallet_withdraw_object& swwo) = 0;
protected:

View file

@ -75,9 +75,7 @@ public:
virtual ~sidechain_net_handler_bitcoin();
void recreate_primary_wallet();
void process_deposits();
void process_deposit(const son_wallet_deposit_object& swdo);
void process_withdrawals();
void process_withdrawal(const son_wallet_withdraw_object& swwo);
std::string create_multisignature_wallet( const std::vector<std::string> public_keys );

View file

@ -14,9 +14,7 @@ public:
virtual ~sidechain_net_handler_peerplays();
void recreate_primary_wallet();
void process_deposits();
void process_deposit(const son_wallet_deposit_object& swdo);
void process_withdrawals();
void process_withdrawal(const son_wallet_withdraw_object& swwo);
std::string create_multisignature_wallet( const std::vector<std::string> public_keys );

View file

@ -44,6 +44,9 @@ class peerplays_sidechain_plugin_impl
void schedule_heartbeat_loop();
void heartbeat_loop();
void schedule_son_processing();
void son_processing();
void approve_proposals();
void create_son_down_proposals();
void recreate_primary_wallet();
void process_deposits();
@ -59,12 +62,12 @@ class peerplays_sidechain_plugin_impl
son_id_type current_son_id;
std::unique_ptr<peerplays_sidechain::sidechain_net_manager> net_manager;
std::map<chain::public_key_type, fc::ecc::private_key> _private_keys;
std::set<chain::son_id_type> _sons;
std::map<chain::public_key_type, fc::ecc::private_key> _private_keys;
fc::future<void> _heartbeat_task;
fc::future<void> _son_processing_task;
void on_applied_block( const signed_block& b );
void on_new_objects(const vector<object_id_type>& new_object_ids);
};
@ -88,6 +91,15 @@ peerplays_sidechain_plugin_impl::~peerplays_sidechain_plugin_impl()
} catch(fc::exception& e) {
edump((e.to_detail_string()));
}
try {
if( _son_processing_task.valid() )
_son_processing_task.cancel_and_wait(__FUNCTION__);
} catch(fc::canceled_exception&) {
//Expected exception. Move along.
} catch(fc::exception& e) {
edump((e.to_detail_string()));
}
}
void peerplays_sidechain_plugin_impl::plugin_set_program_options(
@ -163,7 +175,6 @@ void peerplays_sidechain_plugin_impl::plugin_initialize(const boost::program_opt
}
plugin.database().applied_block.connect( [&] (const signed_block& b) { on_applied_block(b); } );
plugin.database().new_objects.connect( [&] (const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts) { on_new_objects(ids); } );
net_manager = std::unique_ptr<sidechain_net_manager>(new sidechain_net_manager(plugin));
@ -323,14 +334,126 @@ void peerplays_sidechain_plugin_impl::heartbeat_loop()
}
}
void peerplays_sidechain_plugin_impl::schedule_son_processing()
{
fc::time_point now = fc::time_point::now();
int64_t time_to_next_son_processing = 500000;
fc::time_point next_wakeup( now + fc::microseconds( time_to_next_son_processing ) );
_son_processing_task = fc::schedule([this]{son_processing();},
next_wakeup, "SON Processing");
}
void peerplays_sidechain_plugin_impl::son_processing()
{
if (plugin.database().get_global_properties().active_sons.size() <= 0) {
return;
}
chain::son_id_type next_son_id = plugin.database().get_scheduled_son(1);
ilog("peerplays_sidechain_plugin_impl: Scheduled SON ${son}",("son", next_son_id));
// Tasks that are executed by all active SONs, no matter if scheduled
// E.g. sending approvals and signing
approve_proposals();
// Tasks that are executed by scheduled and active SON
if( _sons.find( next_son_id ) != _sons.end() ) {
current_son_id = next_son_id;
create_son_down_proposals();
recreate_primary_wallet();
process_deposits();
process_withdrawals();
}
}
void peerplays_sidechain_plugin_impl::approve_proposals()
{
auto approve_proposal = [ & ]( const chain::son_id_type& son_id, const chain::proposal_id_type& proposal_id )
{
ilog("peerplays_sidechain_plugin: sending approval for ${p} from ${s}", ("p", proposal_id) ("s", son_id));
chain::proposal_update_operation puo;
puo.fee_paying_account = get_son_object(son_id).son_account;
puo.proposal = proposal_id;
puo.active_approvals_to_add = { get_son_object(son_id).son_account };
chain::signed_transaction trx = plugin.database().create_signed_transaction(plugin.get_private_key(son_id), puo);
fc::future<bool> fut = fc::async( [&](){
try {
plugin.database().push_transaction(trx, database::validation_steps::skip_block_size_check);
if(plugin.app().p2p_node())
plugin.app().p2p_node()->broadcast(net::trx_message(trx));
return true;
} catch(fc::exception e){
ilog("peerplays_sidechain_plugin_impl: sending approval failed with exception ${e}",("e", e.what()));
return false;
}
});
fut.wait(fc::seconds(10));
};
const auto& idx = plugin.database().get_index_type<proposal_index>().indices().get<by_id>();
for (const auto &proposal : idx) {
for (son_id_type son_id : _sons) {
if (!is_active_son(son_id)) {
continue;
}
if (proposal.available_active_approvals.find(get_son_object(son_id).son_account) != proposal.available_active_approvals.end()) {
continue;
}
if(proposal.proposed_transaction.operations.size() == 1
&& proposal.proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_report_down_operation>::value) {
approve_proposal( son_id, proposal.id );
continue;
}
if(proposal.proposed_transaction.operations.size() == 1
&& proposal.proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_update_operation>::value) {
approve_proposal( son_id, proposal.id );
continue;
}
if(proposal.proposed_transaction.operations.size() == 1
&& proposal.proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_deposit_create_operation>::value) {
approve_proposal( son_id, proposal.id );
continue;
}
if(proposal.proposed_transaction.operations.size() == 1
&& proposal.proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_deposit_process_operation>::value) {
approve_proposal( son_id, proposal.id );
continue;
}
if(proposal.proposed_transaction.operations.size() == 1
&& proposal.proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_withdraw_create_operation>::value) {
approve_proposal( son_id, proposal.id );
continue;
}
if(proposal.proposed_transaction.operations.size() == 1
&& proposal.proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_withdraw_process_operation>::value) {
approve_proposal( son_id, proposal.id );
continue;
}
}
}
}
void peerplays_sidechain_plugin_impl::create_son_down_proposals()
{
auto create_son_down_proposal = [&](chain::son_id_type son_id, fc::time_point_sec last_active_ts) {
chain::database& d = plugin.database();
chain::son_id_type my_son_id = *(_sons.begin());
const chain::global_property_object& gpo = d.get_global_properties();
const auto& idx = d.get_index_type<chain::son_index>().indices().get<by_id>();
auto son_obj = idx.find( my_son_id );
chain::son_report_down_operation son_down_op;
son_down_op.payer = GRAPHENE_SON_ACCOUNT;
@ -338,7 +461,7 @@ void peerplays_sidechain_plugin_impl::create_son_down_proposals()
son_down_op.down_ts = last_active_ts;
proposal_create_operation proposal_op;
proposal_op.fee_paying_account = son_obj->son_account;
proposal_op.fee_paying_account = get_son_object(plugin.get_current_son_id()).son_account;
proposal_op.proposed_ops.push_back( op_wrapper( son_down_op ) );
uint32_t lifetime = ( gpo.parameters.block_interval * gpo.active_witnesses.size() ) * 3;
proposal_op.expiration_time = time_point_sec( d.head_block_time().sec_since_epoch() + lifetime );
@ -350,7 +473,7 @@ void peerplays_sidechain_plugin_impl::create_son_down_proposals()
const chain::dynamic_global_property_object& dgpo = d.get_dynamic_global_properties();
const auto& idx = d.get_index_type<chain::son_index>().indices().get<by_id>();
std::set<son_id_type> sons_being_reported_down = d.get_sons_being_reported_down();
chain::son_id_type my_son_id = *(_sons.begin());
chain::son_id_type my_son_id = get_current_son_id();
for(auto son_inf: gpo.active_sons) {
if(my_son_id == son_inf.son_id || (sons_being_reported_down.find(son_inf.son_id) != sons_being_reported_down.end())){
continue;
@ -364,7 +487,7 @@ void peerplays_sidechain_plugin_impl::create_son_down_proposals()
((fc::time_point::now() - last_active_ts) > fc::microseconds(down_threshold))) {
ilog("peerplays_sidechain_plugin: sending son down proposal for ${t} from ${s}",("t",std::string(object_id_type(son_obj->id)))("s",std::string(object_id_type(my_son_id))));
chain::proposal_create_operation op = create_son_down_proposal(son_inf.son_id, last_active_ts);
chain::signed_transaction trx = d.create_signed_transaction(plugin.get_private_key(son_obj->signing_key), op);
chain::signed_transaction trx = d.create_signed_transaction(plugin.get_private_key(get_son_object(my_son_id).signing_key), op);
fc::future<bool> fut = fc::async( [&](){
try {
d.push_transaction(trx, database::validation_steps::skip_block_size_check);
@ -398,110 +521,7 @@ void peerplays_sidechain_plugin_impl::process_withdrawals()
void peerplays_sidechain_plugin_impl::on_applied_block( const signed_block& b )
{
chain::database& d = plugin.database();
const chain::global_property_object& gpo = d.get_global_properties();
bool latest_block = ((fc::time_point::now() - b.timestamp) < fc::microseconds(gpo.parameters.block_interval * 1000000));
if(gpo.active_sons.size() <= 0 || !latest_block) {
return;
}
chain::son_id_type next_son_id = d.get_scheduled_son(1);
ilog("peerplays_sidechain_plugin_impl: Scheduled SON ${son}",("son", next_son_id));
// check if we control scheduled SON
if( _sons.find( next_son_id ) != _sons.end() ) {
current_son_id = next_son_id;
create_son_down_proposals();
recreate_primary_wallet();
process_deposits();
process_withdrawals();
}
}
void peerplays_sidechain_plugin_impl::on_new_objects(const vector<object_id_type>& new_object_ids)
{
auto approve_proposal = [ & ]( const chain::son_id_type& son_id, const chain::proposal_id_type& proposal_id )
{
ilog("peerplays_sidechain_plugin: sending approval for ${p} from ${s}", ("p", proposal_id) ("s", son_id));
chain::proposal_update_operation puo;
puo.fee_paying_account = get_son_object(son_id).son_account;
puo.proposal = proposal_id;
puo.active_approvals_to_add = { get_son_object(son_id).son_account };
chain::signed_transaction trx = plugin.database().create_signed_transaction(plugin.get_private_key(son_id), puo);
fc::future<bool> fut = fc::async( [&](){
try {
plugin.database().push_transaction(trx, database::validation_steps::skip_block_size_check);
if(plugin.app().p2p_node())
plugin.app().p2p_node()->broadcast(net::trx_message(trx));
return true;
} catch(fc::exception e){
ilog("peerplays_sidechain_plugin_impl: sending approval failed with exception ${e}",("e", e.what()));
return false;
}
});
fut.wait(fc::seconds(10));
};
for(auto object_id: new_object_ids) {
if( object_id.is<chain::proposal_object>() ) {
for (son_id_type son_id : _sons) {
if (!is_active_son(son_id)) {
continue;
}
const object* obj = plugin.database().find_object(object_id);
const chain::proposal_object* proposal = dynamic_cast<const chain::proposal_object*>(obj);
if(proposal == nullptr || (proposal->available_active_approvals.find(get_son_object(son_id).son_account) != proposal->available_active_approvals.end())) {
continue;
}
if(proposal->proposed_transaction.operations.size() == 1
&& proposal->proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_report_down_operation>::value) {
approve_proposal( son_id, proposal->id );
continue;
}
if(proposal->proposed_transaction.operations.size() == 1
&& proposal->proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_update_operation>::value) {
approve_proposal( son_id, proposal->id );
continue;
}
if(proposal->proposed_transaction.operations.size() == 1
&& proposal->proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_deposit_create_operation>::value) {
approve_proposal( son_id, proposal->id );
continue;
}
if(proposal->proposed_transaction.operations.size() == 1
&& proposal->proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_deposit_process_operation>::value) {
approve_proposal( son_id, proposal->id );
continue;
}
if(proposal->proposed_transaction.operations.size() == 1
&& proposal->proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_withdraw_create_operation>::value) {
approve_proposal( son_id, proposal->id );
continue;
}
if(proposal->proposed_transaction.operations.size() == 1
&& proposal->proposed_transaction.operations[0].which() == chain::operation::tag<chain::son_wallet_withdraw_process_operation>::value) {
approve_proposal( son_id, proposal->id );
continue;
}
}
}
}
schedule_son_processing();
}
} // end namespace detail
@ -525,7 +545,6 @@ void peerplays_sidechain_plugin::plugin_set_program_options(
boost::program_options::options_description& cli,
boost::program_options::options_description& cfg)
{
ilog("peerplays sidechain plugin: plugin_set_program_options()");
my->plugin_set_program_options(cli, cfg);
}

View file

@ -87,7 +87,6 @@ void sidechain_net_handler::sidechain_event_data_received(const sidechain_event_
uint32_t lifetime = ( gpo.parameters.block_interval * gpo.active_witnesses.size() ) * 3;
proposal_op.expiration_time = time_point_sec( database.head_block_time().sec_since_epoch() + lifetime );
//ilog("sidechain_net_handler: sending proposal for son wallet deposit create operation by ${son}", ("son", son_id));
signed_transaction trx = plugin.database().create_signed_transaction(plugin.get_private_key(son_id), proposal_op);
try {
database.push_transaction(trx, database::validation_steps::skip_block_size_check);
@ -130,7 +129,6 @@ void sidechain_net_handler::sidechain_event_data_received(const sidechain_event_
uint32_t lifetime = ( gpo.parameters.block_interval * gpo.active_witnesses.size() ) * 3;
proposal_op.expiration_time = time_point_sec( database.head_block_time().sec_since_epoch() + lifetime );
//ilog("sidechain_net_handler: sending proposal for son wallet withdraw create operation by ${son}", ("son", son_id));
signed_transaction trx = plugin.database().create_signed_transaction(plugin.get_private_key(son_id), proposal_op);
try {
database.push_transaction(trx, database::validation_steps::skip_block_size_check);
@ -147,10 +145,6 @@ void sidechain_net_handler::sidechain_event_data_received(const sidechain_event_
FC_ASSERT(false, "Invalid sidechain event");
}
void sidechain_net_handler::recreate_primary_wallet() {
FC_ASSERT(false, "recreate_primary_wallet not implemented");
}
void sidechain_net_handler::process_deposits() {
const auto& idx = plugin.database().get_index_type<son_wallet_deposit_index>().indices().get<by_sidechain_and_processed>();
const auto& idx_range = idx.equal_range(std::make_tuple(sidechain, false));
@ -221,5 +215,18 @@ void sidechain_net_handler::process_withdrawals() {
});
}
void sidechain_net_handler::recreate_primary_wallet() {
FC_ASSERT(false, "recreate_primary_wallet not implemented");
}
void sidechain_net_handler::process_deposit(const son_wallet_deposit_object& swdo) {
FC_ASSERT(false, "process_deposit not implemented");
}
void sidechain_net_handler::process_withdrawal(const son_wallet_withdraw_object& swwo) {
FC_ASSERT(false, "process_withdrawal not implemented");
}
} } // graphene::peerplays_sidechain

View file

@ -482,19 +482,11 @@ void sidechain_net_handler_bitcoin::recreate_primary_wallet() {
}
}
void sidechain_net_handler_bitcoin::process_deposits() {
sidechain_net_handler::process_deposits();
}
void sidechain_net_handler_bitcoin::process_deposit(const son_wallet_deposit_object &swdo) {
ilog(__FUNCTION__);
transfer_deposit_to_primary_wallet(swdo);
}
void sidechain_net_handler_bitcoin::process_withdrawals() {
sidechain_net_handler::process_withdrawals();
}
void sidechain_net_handler_bitcoin::process_withdrawal(const son_wallet_withdraw_object &swwo) {
ilog(__FUNCTION__);
transfer_withdrawal_from_primary_wallet(swwo);

View file

@ -31,17 +31,9 @@ sidechain_net_handler_peerplays::~sidechain_net_handler_peerplays() {
void sidechain_net_handler_peerplays::recreate_primary_wallet() {
}
void sidechain_net_handler_peerplays::process_deposits() {
sidechain_net_handler::process_deposits();
}
void sidechain_net_handler_peerplays::process_deposit(const son_wallet_deposit_object& swdo) {
}
void sidechain_net_handler_peerplays::process_withdrawals() {
sidechain_net_handler::process_withdrawals();
}
void sidechain_net_handler_peerplays::process_withdrawal(const son_wallet_withdraw_object& swwo) {
}