Merge branch 'develop' into local_test_block_events

This commit is contained in:
hirunda 2023-05-18 14:11:22 +02:00
commit ed9ffc4a39
27 changed files with 1093 additions and 423 deletions

View file

@ -9,6 +9,7 @@ stages:
- build
- test
- dockerize
- python-test
build-mainnet:
stage: build
@ -48,6 +49,7 @@ dockerize-mainnet:
IMAGE: $CI_REGISTRY_IMAGE/mainnet/$CI_COMMIT_REF_SLUG:$CI_COMMIT_SHA
before_script:
- docker info
- docker builder prune -a -f
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker build --no-cache -t $IMAGE .
@ -56,8 +58,6 @@ dockerize-mainnet:
- docker rmi $IMAGE
tags:
- builder
when:
manual
timeout:
3h
@ -119,3 +119,32 @@ dockerize-testnet:
manual
timeout:
3h
test-e2e:
stage: python-test
variables:
IMAGE: $CI_REGISTRY_IMAGE/mainnet/$CI_COMMIT_REF_SLUG:$CI_COMMIT_SHA
before_script:
- docker info
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- git clone https://gitlab.com/PBSA/tools-libs/peerplays-utils.git
- cd peerplays-utils/peerplays-qa-environment
- git checkout origin/feature/python-e2e-tests-for-CI
- cd e2e-tests/
- python3 -m venv venv
- source venv/bin/activate
- pip3 install -r requirements.txt
- python3 main.py --stop
- docker ps -a
- docker pull $IMAGE
- docker tag $IMAGE peerplays-base:latest
- docker image ls -a
- python3 main.py --start all
- docker ps -a
- python3 -m pytest test_btc_init_state.py test_hive_inital_state.py test_pp_inital_state.py
- python3 main.py --stop
- deactivate
- docker ps -a
tags:
- python-tests

View file

@ -136,6 +136,7 @@ RUN \
RUN \
git clone https://github.com/libbitcoin/libbitcoin-build.git && \
cd libbitcoin-build && \
git reset --hard 92c215fc1ffa272bab4d485d369d0306db52d69d && \
./generate3.sh && \
cd ../libbitcoin-explorer && \
./install.sh && \
@ -189,7 +190,6 @@ ADD . peerplays
RUN \
cd peerplays && \
git submodule update --init --recursive && \
git symbolic-ref --short HEAD && \
git log --oneline -n 5 && \
mkdir build && \
cd build && \

View file

@ -136,6 +136,7 @@ RUN \
RUN \
git clone https://github.com/libbitcoin/libbitcoin-build.git && \
cd libbitcoin-build && \
git reset --hard 92c215fc1ffa272bab4d485d369d0306db52d69d && \
./generate3.sh && \
cd ../libbitcoin-explorer && \
./install.sh && \

View file

@ -79,6 +79,7 @@ libbitcoin-explorer setup:
```
git clone https://github.com/libbitcoin/libbitcoin-build.git
cd libbitcoin-build
git reset --hard 92c215fc1ffa272bab4d485d369d0306db52d69d
./generate3.sh
cd ../libbitcoin-explorer
sudo ./install.sh

View file

@ -53,7 +53,54 @@ void verify_authority_accounts( const database& db, const authority& a )
}
}
void verify_account_votes( const database& db, const account_options& options )
// Overwrites the num_son values from the origin to the destination for those sidechains which are found in the origin.
// Keeps the values of num_son for the sidechains which are found in the destination, but not in the origin.
// Returns false if an error is detected.
bool merge_num_sons( flat_map<sidechain_type, uint16_t>& destination,
const flat_map<sidechain_type, uint16_t>& origin,
fc::optional<time_point_sec> head_block_time = {})
{
const auto active_sidechains = head_block_time.valid() ? active_sidechain_types(*head_block_time) : all_sidechain_types;
bool success = true;
for (const auto &ns : origin)
{
destination[ns.first] = ns.second;
if (active_sidechains.find(ns.first) == active_sidechains.end())
{
success = false;
}
}
return success;
}
flat_map<sidechain_type, uint16_t> count_SON_votes_per_sidechain( const flat_set<vote_id_type>& votes )
{
flat_map<sidechain_type, uint16_t> SON_votes_per_sidechain = account_options::ext::empty_num_son();
for (const auto &vote : votes)
{
switch (vote.type())
{
case vote_id_type::son_bitcoin:
SON_votes_per_sidechain[sidechain_type::bitcoin]++;
break;
case vote_id_type::son_hive:
SON_votes_per_sidechain[sidechain_type::hive]++;
break;
case vote_id_type::son_ethereum:
SON_votes_per_sidechain[sidechain_type::ethereum]++;
break;
default:
break;
}
}
return SON_votes_per_sidechain;
}
void verify_account_votes( const database& db, const account_options& options, fc::optional<account_object> account = {} )
{
// ensure account's votes satisfy requirements
// NB only the part of vote checking that requires chain state is here,
@ -69,14 +116,40 @@ void verify_account_votes( const database& db, const account_options& options )
FC_ASSERT( options.num_committee <= chain_params.maximum_committee_count,
"Voted for more committee members than currently allowed (${c})", ("c", chain_params.maximum_committee_count) );
FC_ASSERT( chain_params.extensions.value.maximum_son_count.valid() , "Invalid maximum son count" );
flat_map<sidechain_type, uint16_t> merged_num_sons = account_options::ext::empty_num_son();
// Merge with existing account if exists
if ( account.valid() && account->options.extensions.value.num_son.valid())
{
merge_num_sons( merged_num_sons, *account->options.extensions.value.num_son, db.head_block_time() );
}
// Apply update operation on top
if ( options.extensions.value.num_son.valid() )
{
for(const auto& num_sons : *options.extensions.value.num_son)
merge_num_sons( merged_num_sons, *options.extensions.value.num_son, db.head_block_time() );
}
for(const auto& num_sons : merged_num_sons)
{
FC_ASSERT( num_sons.second <= *chain_params.extensions.value.maximum_son_count,
"Voted for more sons than currently allowed (${c})", ("c", *chain_params.extensions.value.maximum_son_count) );
}
// Count the votes for SONs and confirm that the account did not vote for less SONs than num_son
flat_map<sidechain_type, uint16_t> SON_votes_per_sidechain = count_SON_votes_per_sidechain(options.votes);
for (const auto& number_of_votes : SON_votes_per_sidechain)
{
// Number of votes of account_options are also checked in account_options::do_evaluate,
// but there we are checking the value before merging num_sons, so the values should be checked again
const auto sidechain = number_of_votes.first;
FC_ASSERT( number_of_votes.second >= merged_num_sons[sidechain],
"Voted for less sons than specified in num_son (votes ${v} < num_son ${ns}) for sidechain ${s}",
("v", number_of_votes.second) ("ns", merged_num_sons[sidechain]) ("s", sidechain) );
}
FC_ASSERT( db.find_object(options.voting_account), "Invalid proxy account specified." );
uint32_t max_vote_id = gpo.next_available_vote_id;
@ -191,9 +264,10 @@ object_id_type account_create_evaluator::do_apply( const account_create_operatio
obj.active = o.active;
obj.options = o.options;
if (!obj.options.extensions.value.num_son.valid())
obj.options.extensions.value.num_son = account_options::ext::empty_num_son();
if ( o.options.extensions.value.num_son.valid() )
{
obj.options.extensions.value = account_options::ext();
merge_num_sons( *obj.options.extensions.value.num_son, *o.options.extensions.value.num_son );
}
obj.statistics = d.create<account_statistics_object>([&obj](account_statistics_object& s){
@ -295,7 +369,7 @@ void_result account_update_evaluator::do_evaluate( const account_update_operatio
acnt = &o.account(d);
if( o.new_options.valid() )
verify_account_votes( d, *o.new_options );
verify_account_votes( d, *o.new_options, *acnt );
return void_result();
} FC_CAPTURE_AND_RETHROW( (o) ) }
@ -334,7 +408,31 @@ void_result account_update_evaluator::do_apply( const account_update_operation&
a.active = *o.active;
a.top_n_control_flags = 0;
}
if( o.new_options ) a.options = *o.new_options;
// New num_son structure initialized to 0
flat_map<sidechain_type, uint16_t> new_num_son = account_options::ext::empty_num_son();
// If num_son of existing object is valid, we should merge the existing data
if ( a.options.extensions.value.num_son.valid() )
{
merge_num_sons( new_num_son, *a.options.extensions.value.num_son );
}
// If num_son of the operation are valid, they should merge the existing data
if ( o.new_options )
{
const auto new_options = *o.new_options;
if ( new_options.extensions.value.num_son.valid() )
{
merge_num_sons( new_num_son, *new_options.extensions.value.num_son );
}
a.options = *o.new_options;
}
a.options.extensions.value.num_son = new_num_son;
if( o.extensions.value.owner_special_authority.valid() )
{
a.owner_special_authority = *(o.extensions.value.owner_special_authority);

View file

@ -42,15 +42,22 @@ namespace graphene { namespace chain {
{
/// The number of active son members this account votes the blockchain should appoint
/// Must not exceed the actual number of son members voted for in @ref votes
optional< flat_map<sidechain_type, uint16_t> > num_son = []{
optional< flat_map<sidechain_type, uint16_t> > num_son;
/// Returns and empty num_son map with all sidechains
static flat_map<sidechain_type, uint16_t> empty_num_son()
{
flat_map<sidechain_type, uint16_t> num_son;
for(const auto& active_sidechain_type : all_sidechain_types){
for(const auto& active_sidechain_type : all_sidechain_types)
{
num_son[active_sidechain_type] = 0;
}
return num_son;
}();
}
};
/// The memo key is the key this account will typically use to encrypt/sign transaction memos and other non-
/// validated account activities. This field is here to prevent confusion if the active authority has zero or
/// multiple keys in it.

View file

@ -18,7 +18,7 @@ namespace graphene
// Buyer purchasing lottery tickets
account_id_type buyer;
// count of tickets to buy
uint64_t tickets_to_buy;
share_type tickets_to_buy;
// amount that can spent
asset amount;

View file

@ -36,6 +36,15 @@ namespace graphene { namespace chain {
deposit_address(""),
withdraw_public_key(""),
withdraw_address("") {}
inline string get_deposit_address() const {
if(sidechain_type::ethereum != sidechain)
return deposit_address;
auto deposit_address_lower = deposit_address;
std::transform(deposit_address_lower.begin(), deposit_address_lower.end(), deposit_address_lower.begin(), ::tolower);
return deposit_address_lower;
}
};
struct by_account;
@ -76,7 +85,7 @@ namespace graphene { namespace chain {
ordered_non_unique< tag<by_sidechain_and_deposit_address_and_expires>,
composite_key<sidechain_address_object,
member<sidechain_address_object, sidechain_type, &sidechain_address_object::sidechain>,
member<sidechain_address_object, string, &sidechain_address_object::deposit_address>,
const_mem_fun<sidechain_address_object, string, &sidechain_address_object::get_deposit_address>,
member<sidechain_address_object, time_point_sec, &sidechain_address_object::expires>
>
>

View file

@ -30,7 +30,7 @@ namespace graphene
auto lottery_options = lottery_md_obj.lottery_data->lottery_options;
FC_ASSERT(lottery_options.ticket_price.asset_id == op.amount.asset_id);
FC_ASSERT((double)op.amount.amount.value / lottery_options.ticket_price.amount.value == (double)op.tickets_to_buy);
FC_ASSERT(op.tickets_to_buy * lottery_options.ticket_price.amount.value == op.amount.amount.value);
return void_result();
}
FC_CAPTURE_AND_RETHROW((op))

View file

@ -128,6 +128,124 @@ namespace graphene { namespace net {
namespace detail
{
namespace bmi = boost::multi_index;
/*******
* A class to wrap std::unordered_set for multithreading
*/
template <class Key, class Hash = std::hash<Key>, class Pred = std::equal_to<Key> >
class concurrent_unordered_set : private std::unordered_set<Key, Hash, Pred>
{
private:
mutable fc::mutex mux;
public:
/// Iterations require a lock. This exposes the mutex. Use with care (i.e. lock_guard)
fc::mutex& get_mutex()const { return mux; }
/// Insertion
/// @{
std::pair< typename std::unordered_set<Key, Hash, Pred>::iterator, bool> emplace( Key key)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::emplace( key );
}
std::pair< typename std::unordered_set<Key, Hash, Pred>::iterator, bool> insert (const Key& val)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::insert( val );
}
/// @}
/// Size
/// @{
size_t size() const
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::size();
}
bool empty() const noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::empty();
}
/// @}
/// Removal
/// @{
void clear() noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
std::unordered_set<Key, Hash, Pred>::clear();
}
typename std::unordered_set<Key, Hash, Pred>::iterator erase(
typename std::unordered_set<Key, Hash, Pred>::const_iterator itr)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::erase( itr);
}
size_t erase( const Key& key)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::erase( key );
}
/// @}
/// Swap
/// @{
void swap( typename std::unordered_set<Key, Hash, Pred>& other ) noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
std::unordered_set<Key, Hash, Pred>::swap( other );
}
/// @}
/// Iteration
/// @{
typename std::unordered_set<Key, Hash, Pred>::iterator begin() noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::begin();
}
typename std::unordered_set<Key, Hash, Pred>::const_iterator begin() const noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::begin();
}
typename std::unordered_set<Key, Hash, Pred>::local_iterator begin(size_t n)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::begin(n);
}
typename std::unordered_set<Key, Hash, Pred>::const_local_iterator begin(size_t n) const
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::begin(n);
}
typename std::unordered_set<Key, Hash, Pred>::iterator end() noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::end();
}
typename std::unordered_set<Key, Hash, Pred>::const_iterator end() const noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::end();
}
typename std::unordered_set<Key, Hash, Pred>::local_iterator end(size_t n)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::end(n);
}
typename std::unordered_set<Key, Hash, Pred>::const_local_iterator end(size_t n) const
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::end(n);
}
/// @}
/// Search
typename std::unordered_set<Key, Hash, Pred>::const_iterator find(Key key)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::find(key);
}
};
class blockchain_tied_message_cache
{
private:
@ -483,7 +601,7 @@ namespace graphene { namespace net { namespace detail {
// @{
fc::promise<void>::ptr _retrigger_advertise_inventory_loop_promise;
fc::future<void> _advertise_inventory_loop_done;
std::unordered_set<item_id> _new_inventory; /// list of items we have received but not yet advertised to our peers
concurrent_unordered_set<item_id> _new_inventory; /// list of items we have received but not yet advertised to our peers
// @}
fc::future<void> _terminate_inactive_connections_loop_done;
@ -519,13 +637,13 @@ namespace graphene { namespace net { namespace detail {
/** Stores all connections which have not yet finished key exchange or are still sending initial handshaking messages
* back and forth (not yet ready to initiate syncing) */
std::unordered_set<peer_connection_ptr> _handshaking_connections;
concurrent_unordered_set<peer_connection_ptr> _handshaking_connections;
/** stores fully established connections we're either syncing with or in normal operation with */
std::unordered_set<peer_connection_ptr> _active_connections;
concurrent_unordered_set<peer_connection_ptr> _active_connections;
/** stores connections we've closed (sent closing message, not actually closed), but are still waiting for the remote end to close before we delete them */
std::unordered_set<peer_connection_ptr> _closing_connections;
concurrent_unordered_set<peer_connection_ptr> _closing_connections;
/** stores connections we've closed, but are still waiting for the OS to notify us that the socket is really closed */
std::unordered_set<peer_connection_ptr> _terminating_connections;
concurrent_unordered_set<peer_connection_ptr> _terminating_connections;
boost::circular_buffer<item_hash_t> _most_recent_blocks_accepted; // the /n/ most recent blocks we've accepted (currently tuned to the max number of connections)
@ -854,6 +972,8 @@ namespace graphene { namespace net { namespace detail {
ilog( "cleaning up node" );
_node_is_shutting_down.store(true);
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& active_peer : _active_connections)
{
fc::optional<fc::ip::endpoint> inbound_endpoint = active_peer->get_endpoint_for_connecting();
@ -867,6 +987,7 @@ namespace graphene { namespace net { namespace detail {
}
}
}
}
try
{
@ -1061,6 +1182,7 @@ namespace graphene { namespace net { namespace detail {
std::set<item_hash_t> sync_items_to_request;
// for each idle peer that we're syncing with
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
if( peer->we_need_sync_items_from_peer &&
@ -1119,6 +1241,7 @@ namespace graphene { namespace net { namespace detail {
bool node_impl::is_item_in_any_peers_inventory(const item_id& item) const
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() )
@ -1158,9 +1281,13 @@ namespace graphene { namespace net { namespace detail {
fetch_messages_to_send_set items_by_peer;
// initialize the fetch_messages_to_send with an empty set of items for all idle peers
for (const peer_connection_ptr& peer : _active_connections)
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections) {
if (peer->idle())
items_by_peer.insert(peer_and_items_to_fetch(peer));
}
}
// now loop over all items we want to fetch
for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();)
@ -1262,13 +1389,15 @@ namespace graphene { namespace net { namespace detail {
dlog("beginning an iteration of advertise inventory");
// swap inventory into local variable, clearing the node's copy
std::unordered_set<item_id> inventory_to_advertise;
inventory_to_advertise.swap(_new_inventory);
_new_inventory.swap(inventory_to_advertise);
// process all inventory to advertise and construct the inventory messages we'll send
// first, then send them all in a batch (to avoid any fiber interruption points while
// we're computing the messages)
std::list<std::pair<peer_connection_ptr, item_ids_inventory_message> > inventory_messages_to_send;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
// only advertise to peers who are in sync with us
@ -1313,6 +1442,7 @@ namespace graphene { namespace net { namespace detail {
}
peer->clear_old_inventory();
}
}
for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter)
iter->first->send_message(iter->second);
@ -1360,7 +1490,10 @@ namespace graphene { namespace net { namespace detail {
uint32_t handshaking_timeout = _peer_inactivity_timeout;
fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout);
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for( const peer_connection_ptr handshaking_peer : _handshaking_connections )
{
if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold &&
handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold &&
handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold )
@ -1379,6 +1512,8 @@ namespace graphene { namespace net { namespace detail {
("received", handshaking_peer->get_total_bytes_received())));
peers_to_disconnect_forcibly.push_back( handshaking_peer );
}
}
}
// timeout for any active peers is two block intervals
uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds;
@ -1398,6 +1533,8 @@ namespace graphene { namespace net { namespace detail {
fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout);
fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout);
fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& active_peer : _active_connections )
{
if( active_peer->connection_initiation_time < active_disconnect_threshold &&
@ -1465,27 +1602,34 @@ namespace graphene { namespace net { namespace detail {
}
}
}
}
fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT);
for( const peer_connection_ptr& closing_peer : _closing_connections )
if( closing_peer->connection_closed_time < closing_disconnect_threshold )
{
fc::scoped_lock<fc::mutex> lock(_closing_connections.get_mutex());
for( const peer_connection_ptr& closing_peer : _closing_connections ) {
if (closing_peer->connection_closed_time < closing_disconnect_threshold) {
// we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT
// seconds ago, but they haven't done it yet. Terminate the connection now
wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner",
( "peer", closing_peer->get_remote_endpoint() ) );
peers_to_disconnect_forcibly.push_back( closing_peer );
wlog("Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner",
("peer", closing_peer->get_remote_endpoint()));
peers_to_disconnect_forcibly.push_back(closing_peer);
}
}
}
uint32_t failed_terminate_timeout_seconds = 120;
fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds);
for (const peer_connection_ptr& peer : _terminating_connections )
if (peer->get_connection_terminated_time() != fc::time_point::min() &&
peer->get_connection_terminated_time() < failed_terminate_threshold)
{
fc::scoped_lock<fc::mutex> lock(_terminating_connections.get_mutex());
for (const peer_connection_ptr& peer : _terminating_connections ) {
if (peer->get_connection_terminated_time() != fc::time_point::min() &&
peer->get_connection_terminated_time() < failed_terminate_threshold) {
wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint()));
peers_to_terminate.push_back(peer);
}
}
}
// That's the end of the sorting step; now all peers that require further processing are now in one of the
// lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate
@ -1493,12 +1637,15 @@ namespace graphene { namespace net { namespace detail {
// if we've decided to delete any peers, do it now; in its current implementation this doesn't yield,
// and once we start yielding, we may find that we've moved that peer to another list (closed or active)
// and that triggers assertions, maybe even errors
{
fc::scoped_lock<fc::mutex> lock(_terminating_connections.get_mutex());
for (const peer_connection_ptr& peer : peers_to_terminate )
{
assert(_terminating_connections.find(peer) != _terminating_connections.end());
_terminating_connections.erase(peer);
schedule_peer_for_deletion(peer);
}
}
peers_to_terminate.clear();
// if we're going to abruptly disconnect anyone, do it here
@ -1516,6 +1663,7 @@ namespace graphene { namespace net { namespace detail {
// disconnect reason, so it may yield)
for( const peer_connection_ptr& peer : peers_to_disconnect_gently )
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity",
( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
@ -1539,6 +1687,7 @@ namespace graphene { namespace net { namespace detail {
{
VERIFY_CORRECT_THREAD();
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
std::list<peer_connection_ptr> original_active_peers(_active_connections.begin(), _active_connections.end());
for( const peer_connection_ptr& active_peer : original_active_peers )
{
@ -1710,12 +1859,19 @@ namespace graphene { namespace net { namespace detail {
peer_connection_ptr node_impl::get_peer_by_node_id(const node_id_t& node_id)
{
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& active_peer : _active_connections)
if (node_id == active_peer->node_id)
return active_peer;
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for (const peer_connection_ptr& handshaking_peer : _handshaking_connections)
if (node_id == handshaking_peer->node_id)
return handshaking_peer;
}
return peer_connection_ptr();
}
@ -1727,18 +1883,25 @@ namespace graphene { namespace net { namespace detail {
dlog("is_already_connected_to_id returning true because the peer is us");
return true;
}
for (const peer_connection_ptr active_peer : _active_connections)
if (node_id == active_peer->node_id)
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr active_peer : _active_connections) {
if (node_id == active_peer->node_id) {
dlog("is_already_connected_to_id returning true because the peer is already in our active list");
return true;
}
for (const peer_connection_ptr handshaking_peer : _handshaking_connections)
if (node_id == handshaking_peer->node_id)
}
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for (const peer_connection_ptr handshaking_peer : _handshaking_connections) {
if (node_id == handshaking_peer->node_id) {
dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list");
return true;
}
}
}
return false;
}
@ -1770,6 +1933,8 @@ namespace graphene { namespace net { namespace detail {
("max", _maximum_number_of_connections));
dlog(" my id is ${id}", ("id", _node_id));
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& active_connection : _active_connections)
{
dlog(" active: ${endpoint} with ${id} [${direction}]",
@ -1777,6 +1942,9 @@ namespace graphene { namespace net { namespace detail {
("id", active_connection->node_id)
("direction", active_connection->direction));
}
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for (const peer_connection_ptr& handshaking_connection : _handshaking_connections)
{
dlog(" handshaking: ${endpoint} with ${id} [${direction}]",
@ -1785,6 +1953,7 @@ namespace graphene { namespace net { namespace detail {
("direction", handshaking_connection->direction));
}
}
}
void node_impl::on_message( peer_connection* originating_peer, const message& received_message )
{
@ -2229,6 +2398,7 @@ namespace graphene { namespace net { namespace detail {
if (!_peer_advertising_disabled)
{
reply.addresses.reserve(_active_connections.size());
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& active_peer : _active_connections)
{
fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint());
@ -2414,12 +2584,15 @@ namespace graphene { namespace net { namespace detail {
{
VERIFY_CORRECT_THREAD();
uint32_t max_number_of_unfetched_items = 0;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids;
max_number_of_unfetched_items = std::max(max_number_of_unfetched_items,
this_peer_number_of_unfetched_items);
}
}
return max_number_of_unfetched_items;
}
@ -2633,17 +2806,19 @@ namespace graphene { namespace net { namespace detail {
originating_peer->ids_of_items_to_get.empty())
{
bool is_first_item_for_other_peer = false;
for (const peer_connection_ptr& peer : _active_connections)
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections) {
if (peer != originating_peer->shared_from_this() &&
!peer->ids_of_items_to_get.empty() &&
peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front())
{
peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) {
dlog("The item ${newitem} is the first item for peer ${peer}",
("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())
("peer", peer->get_remote_endpoint()));
("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())("peer", peer->get_remote_endpoint()));
is_first_item_for_other_peer = true;
break;
}
}
}
dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}",
("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size()));
if (!is_first_item_for_other_peer)
@ -2933,6 +3108,8 @@ namespace graphene { namespace net { namespace detail {
item_id advertised_item_id(item_ids_inventory_message_received.item_type, item_hash);
bool we_advertised_this_item_to_a_peer = false;
bool we_requested_this_item_from_a_peer = false;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr peer : _active_connections)
{
if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end())
@ -2943,6 +3120,7 @@ namespace graphene { namespace net { namespace detail {
if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end())
we_requested_this_item_from_a_peer = true;
}
}
// if we have already advertised it to a peer, we must have it, no need to do anything else
if (!we_advertised_this_item_to_a_peer)
@ -3172,6 +3350,7 @@ namespace graphene { namespace net { namespace detail {
};
bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num());
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3254,6 +3433,7 @@ namespace graphene { namespace net { namespace detail {
else
{
// invalid message received
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3356,6 +3536,8 @@ namespace graphene { namespace net { namespace detail {
// find out if this block is the next block on the active chain or one of the forks
bool potential_first_block = false;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3367,6 +3549,7 @@ namespace graphene { namespace net { namespace detail {
peer->ids_of_items_being_processed.insert(received_block_iter->block_id);
}
}
}
// if it is, process it, remove it from all sync peers lists
if (potential_first_block)
@ -3392,6 +3575,7 @@ namespace graphene { namespace net { namespace detail {
{
dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
std::vector< peer_connection_ptr > peers_needing_next_batch;
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
@ -3527,6 +3711,8 @@ namespace graphene { namespace net { namespace detail {
fc::time_point_sec block_time = block_message_to_process.block.timestamp;
bool disconnect_this_peer = false;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3543,10 +3729,14 @@ namespace graphene { namespace net { namespace detail {
}
peer->clear_old_inventory();
}
}
message_propagation_data propagation_data{message_receive_time, message_validated_time, originating_peer->node_id};
broadcast( block_message_to_process, propagation_data );
_message_cache.block_accepted();
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
if (is_hard_fork_block(block_number) )
@ -3578,6 +3768,7 @@ namespace graphene { namespace net { namespace detail {
#endif
}
}
}
if(rejecting_block_due_hf)
{
@ -3614,10 +3805,12 @@ namespace graphene { namespace net { namespace detail {
disconnect_reason = "You offered me a block that I have deemed to be invalid";
peers_to_disconnect.insert( originating_peer->shared_from_this() );
for (const peer_connection_ptr& peer : _active_connections)
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections) {
if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id)
peers_to_disconnect.insert(peer);
}
}
if (restart_sync_exception)
{
@ -3737,6 +3930,8 @@ namespace graphene { namespace net { namespace detail {
void node_impl::forward_firewall_check_to_next_available_peer(firewall_check_state_data* firewall_check_state)
{
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test
@ -3756,6 +3951,8 @@ namespace graphene { namespace net { namespace detail {
return;
}
}
}
wlog("Unable to forward firewall check for node ${to_check} to any other peers, returning 'unable'",
("to_check", firewall_check_state->endpoint_to_test));
@ -3928,6 +4125,8 @@ namespace graphene { namespace net { namespace detail {
}
fc::time_point now = fc::time_point::now();
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3963,6 +4162,8 @@ namespace graphene { namespace net { namespace detail {
data_for_this_peer.user_data = user_data;
reply.current_connections.emplace_back(data_for_this_peer);
}
}
originating_peer->send_message(reply);
}
@ -4047,6 +4248,7 @@ namespace graphene { namespace net { namespace detail {
void node_impl::start_synchronizing()
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
start_synchronizing_with_peer( peer );
}
@ -4253,9 +4455,19 @@ namespace graphene { namespace net { namespace detail {
// the read loop before it gets an EOF).
// operate off copies of the lists in case they change during iteration
std::list<peer_connection_ptr> all_peers;
boost::push_back(all_peers, _active_connections);
boost::push_back(all_peers, _handshaking_connections);
boost::push_back(all_peers, _closing_connections);
auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); };
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
std::for_each(_active_connections.begin(), _active_connections.end(), p_back);
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back);
}
{
fc::scoped_lock<fc::mutex> lock(_closing_connections.get_mutex());
std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back);
}
for (const peer_connection_ptr& peer : all_peers)
{
@ -4521,9 +4733,7 @@ namespace graphene { namespace net { namespace detail {
// whether the peer is firewalled, we want to disconnect now.
_handshaking_connections.erase(new_peer);
_terminating_connections.erase(new_peer);
assert(_active_connections.find(new_peer) == _active_connections.end());
_active_connections.erase(new_peer);
assert(_closing_connections.find(new_peer) == _closing_connections.end());
_closing_connections.erase(new_peer);
display_current_connections();
@ -4867,18 +5077,25 @@ namespace graphene { namespace net { namespace detail {
peer_connection_ptr node_impl::get_connection_to_endpoint( const fc::ip::endpoint& remote_endpoint )
{
VERIFY_CORRECT_THREAD();
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& active_peer : _active_connections )
{
fc::optional<fc::ip::endpoint> endpoint_for_this_peer( active_peer->get_remote_endpoint() );
if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
return active_peer;
}
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for( const peer_connection_ptr& handshaking_peer : _handshaking_connections )
{
fc::optional<fc::ip::endpoint> endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() );
if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
return handshaking_peer;
}
}
return peer_connection_ptr();
}
@ -4922,6 +5139,8 @@ namespace graphene { namespace net { namespace detail {
ilog( " number of peers: ${active} active, ${handshaking}, ${closing} closing. attempting to maintain ${desired} - ${maximum} peers",
( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )("closing",_closing_connections.size() )
( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) );
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}",
@ -4933,11 +5152,15 @@ namespace graphene { namespace net { namespace detail {
ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" );
}
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for( const peer_connection_ptr& peer : _handshaking_connections )
{
ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})",
( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) );
}
}
ilog( "--------- MEMORY USAGE ------------" );
ilog( "node._active_sync_requests size: ${size}", ("size", _active_sync_requests.size() ) );
@ -4946,6 +5169,7 @@ namespace graphene { namespace net { namespace detail {
ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) );
ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) );
ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) );
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) );
@ -5043,6 +5267,7 @@ namespace graphene { namespace net { namespace detail {
{
VERIFY_CORRECT_THREAD();
std::vector<peer_status> statuses;
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -5233,10 +5458,13 @@ namespace graphene { namespace net { namespace detail {
_allowed_peers.clear();
_allowed_peers.insert(allowed_peers.begin(), allowed_peers.end());
std::list<peer_connection_ptr> peers_to_disconnect;
if (!_allowed_peers.empty())
for (const peer_connection_ptr& peer : _active_connections)
if (!_allowed_peers.empty()) {
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr &peer : _active_connections) {
if (_allowed_peers.find(peer->node_id) == _allowed_peers.end())
peers_to_disconnect.push_back(peer);
}
}
for (const peer_connection_ptr& peer : peers_to_disconnect)
disconnect_from_peer(peer.get(), "My allowed_peers list has changed, and you're no longer allowed. Bye.");
#endif // ENABLE_P2P_DEBUGGING_API

View file

@ -18,10 +18,40 @@
namespace graphene { namespace peerplays_sidechain {
rpc_client::rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls) :
url(_url),
user(_user),
password(_password),
struct rpc_reply {
uint16_t status;
std::string body;
};
class rpc_connection {
public:
rpc_connection(const rpc_credentials &_credentials, bool _debug_rpc_calls);
std::string send_post_request(std::string method, std::string params, bool show_log);
std::string get_url() const;
protected:
rpc_credentials credentials;
bool debug_rpc_calls;
std::string protocol;
std::string host;
std::string port;
std::string target;
std::string authorization;
uint32_t request_id;
private:
rpc_reply send_post_request(std::string body, bool show_log);
boost::beast::net::io_context ioc;
boost::beast::net::ip::tcp::resolver resolver;
boost::asio::ip::basic_resolver_results<boost::asio::ip::tcp> results;
};
rpc_connection::rpc_connection(const rpc_credentials &_credentials, bool _debug_rpc_calls) :
credentials(_credentials),
debug_rpc_calls(_debug_rpc_calls),
request_id(0),
resolver(ioc) {
@ -31,7 +61,7 @@ rpc_client::rpc_client(std::string _url, std::string _user, std::string _passwor
boost::xpressive::smatch sm;
if (boost::xpressive::regex_search(url, sm, sr)) {
if (boost::xpressive::regex_search(credentials.url, sm, sr)) {
protocol = sm["Protocol"];
if (protocol.empty()) {
protocol = "http";
@ -52,15 +82,19 @@ rpc_client::rpc_client(std::string _url, std::string _user, std::string _passwor
target = "/";
}
authorization = "Basic " + base64_encode(user + ":" + password);
authorization = "Basic " + base64_encode(credentials.user + ":" + credentials.password);
results = resolver.resolve(host, port);
} else {
elog("Invalid URL: ${url}", ("url", url));
elog("Invalid URL: ${url}", ("url", credentials.url));
}
}
std::string rpc_connection::get_url() const {
return credentials.url;
}
std::string rpc_client::retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx) {
if (reply_str.empty()) {
wlog("RPC call ${function}, empty reply string", ("function", __FUNCTION__));
@ -125,7 +159,7 @@ std::string rpc_client::retrieve_value_from_reply(std::string reply_str, std::st
return "";
}
std::string rpc_client::send_post_request(std::string method, std::string params, bool show_log) {
std::string rpc_connection::send_post_request(std::string method, std::string params, bool show_log) {
std::stringstream body;
request_id = request_id + 1;
@ -164,7 +198,7 @@ std::string rpc_client::send_post_request(std::string method, std::string params
return "";
}
rpc_reply rpc_client::send_post_request(std::string body, bool show_log) {
rpc_reply rpc_connection::send_post_request(std::string body, bool show_log) {
// These object is used as a context for ssl connection
boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client);
@ -239,7 +273,7 @@ rpc_reply rpc_client::send_post_request(std::string body, bool show_log) {
reply.body = rbody;
if (show_log) {
ilog("### Request URL: ${url}", ("url", url));
ilog("### Request URL: ${url}", ("url", credentials.url));
ilog("### Request: ${body}", ("body", body));
ilog("### Response: ${rbody}", ("rbody", rbody));
}
@ -247,4 +281,113 @@ rpc_reply rpc_client::send_post_request(std::string body, bool show_log) {
return reply;
}
rpc_client::rpc_client(sidechain_type _sidechain, const std::vector<rpc_credentials> &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection) :
sidechain(_sidechain),
debug_rpc_calls(_debug_rpc_calls),
simulate_connection_reselection(_simulate_connection_reselection) {
FC_ASSERT(_credentials.size());
for (size_t i = 0; i < _credentials.size(); i++)
connections.push_back(new rpc_connection(_credentials[i], _debug_rpc_calls));
n_active_conn = 0;
if (connections.size() > 1)
schedule_connection_selection();
}
void rpc_client::schedule_connection_selection() {
fc::time_point now = fc::time_point::now();
static const int64_t time_to_next_conn_selection = 10 * 1000 * 1000; // 10 sec
fc::time_point next_wakeup = now + fc::microseconds(time_to_next_conn_selection);
connection_selection_task = fc::schedule([this] {
select_connection();
},
next_wakeup, "SON RPC connection selection");
}
void rpc_client::select_connection() {
FC_ASSERT(connections.size() > 1);
const std::lock_guard<std::mutex> lock(conn_mutex);
static const int t_limit = 5 * 1000 * 1000, // 5 sec
quality_diff_threshold = 10 * 1000; // 10 ms
int best_n = -1;
int best_quality = -1;
std::vector<uint64_t> head_block_numbers;
head_block_numbers.resize(connections.size());
std::vector<int> qualities;
qualities.resize(connections.size());
for (size_t n = 0; n < connections.size(); n++) {
rpc_connection &conn = *connections[n];
int quality = 0;
head_block_numbers[n] = std::numeric_limits<uint64_t>::max();
// ping n'th node
if (debug_rpc_calls)
ilog("### Ping ${sidechain} node #${n}, ${url}", ("sidechain", fc::reflector<sidechain_type>::to_string(sidechain))("n", n)("url", conn.get_url()));
fc::time_point t_sent = fc::time_point::now();
uint64_t head_block_number = ping(conn);
fc::time_point t_received = fc::time_point::now();
int t = (t_received - t_sent).count();
// evaluate n'th node reply quality and switch to it if it's better
if (head_block_number != std::numeric_limits<uint64_t>::max()) {
if (simulate_connection_reselection)
t += rand() % 10;
FC_ASSERT(t != -1);
head_block_numbers[n] = head_block_number;
if (t < t_limit)
quality = t_limit - t; // the less time, the higher quality
// look for the best quality
if (quality > best_quality) {
best_n = n;
best_quality = quality;
}
}
qualities[n] = quality;
}
FC_ASSERT(best_n != -1 && best_quality != -1);
if (best_n != n_active_conn) { // if the best client is not the current one, ...
uint64_t active_head_block_number = head_block_numbers[n_active_conn];
if ((active_head_block_number == std::numeric_limits<uint64_t>::max() // ...and the current one has no known head block...
|| head_block_numbers[best_n] >= active_head_block_number) // ...or the best client's head is more recent than the current, ...
&& best_quality > qualities[n_active_conn] + quality_diff_threshold) { // ...and the new client's quality exceeds current more than by threshold
n_active_conn = best_n; // ...then select new one
if (debug_rpc_calls)
ilog("### Reselected ${sidechain} node to #${n}, ${url}", ("sidechain", fc::reflector<sidechain_type>::to_string(sidechain))("n", n_active_conn)("url", connections[n_active_conn]->get_url()));
}
}
schedule_connection_selection();
}
rpc_connection &rpc_client::get_active_connection() const {
return *connections[n_active_conn];
}
std::string rpc_client::send_post_request(std::string method, std::string params, bool show_log) {
const std::lock_guard<std::mutex> lock(conn_mutex);
return send_post_request(get_active_connection(), method, params, show_log);
}
std::string rpc_client::send_post_request(rpc_connection &conn, std::string method, std::string params, bool show_log) {
return conn.send_post_request(method, params, show_log);
}
rpc_client::~rpc_client() {
try {
if (connection_selection_task.valid())
connection_selection_task.cancel_and_wait(__FUNCTION__);
} catch (fc::canceled_exception &) {
//Expected exception. Move along.
} catch (fc::exception &e) {
edump((e.to_detail_string()));
}
}
}} // namespace graphene::peerplays_sidechain

View file

@ -137,8 +137,9 @@ std::string rlp_encoder::encode_length(int len, int offset) {
std::string rlp_encoder::hex2bytes(const std::string &s) {
std::string dest;
dest.resize(s.size() / 2);
hex2bin(s.c_str(), &dest[0]);
const auto s_final = s.size() % 2 == 0 ? s : "0" + s;
dest.resize(s_final.size() / 2);
hex2bin(s_final.c_str(), &dest[0]);
return dest;
}

View file

@ -2,8 +2,10 @@
#include <curl/curl.h>
#include <cstdint>
#include <functional>
#include <map>
#include <string>
#include <vector>
typedef std::function<uint64_t()> get_fee_func_type;

View file

@ -3,44 +3,52 @@
#include <cstdint>
#include <string>
#include <fc/thread/future.hpp>
#include <fc/thread/thread.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <graphene/peerplays_sidechain/defs.hpp>
namespace graphene { namespace peerplays_sidechain {
struct rpc_reply {
uint16_t status;
std::string body;
class rpc_connection;
struct rpc_credentials {
std::string url;
std::string user;
std::string password;
};
class rpc_client {
public:
rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls);
const sidechain_type sidechain;
rpc_client(sidechain_type _sidechain, const std::vector<rpc_credentials> &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection);
~rpc_client();
protected:
std::string retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx);
std::string retrieve_value_from_reply(std::string reply_str, std::string value_path);
bool debug_rpc_calls;
bool simulate_connection_reselection;
std::string send_post_request(std::string method, std::string params, bool show_log);
std::string url;
std::string user;
std::string password;
bool debug_rpc_calls;
static std::string send_post_request(rpc_connection &conn, std::string method, std::string params, bool show_log);
std::string protocol;
std::string host;
std::string port;
std::string target;
std::string authorization;
uint32_t request_id;
static std::string retrieve_array_value_from_reply(std::string reply_str, std::string array_path, uint32_t idx);
static std::string retrieve_value_from_reply(std::string reply_str, std::string value_path);
private:
rpc_reply send_post_request(std::string body, bool show_log);
std::vector<rpc_connection *> connections;
int n_active_conn;
fc::future<void> connection_selection_task;
std::mutex conn_mutex;
boost::beast::net::io_context ioc;
boost::beast::net::ip::tcp::resolver resolver;
boost::asio::ip::basic_resolver_results<boost::asio::ip::tcp> results;
rpc_connection &get_active_connection() const;
void select_connection();
void schedule_connection_selection();
virtual uint64_t ping(rpc_connection &conn) const = 0;
};
}} // namespace graphene::peerplays_sidechain

View file

@ -16,8 +16,10 @@
namespace graphene { namespace peerplays_sidechain {
class sidechain_net_handler {
protected:
sidechain_net_handler(sidechain_type _sidechain, peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options);
public:
sidechain_net_handler(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options);
virtual ~sidechain_net_handler();
sidechain_type get_sidechain() const;
@ -54,9 +56,9 @@ public:
virtual optional<asset> estimate_withdrawal_transaction_fee() const = 0;
protected:
const sidechain_type sidechain;
peerplays_sidechain_plugin &plugin;
graphene::chain::database &database;
sidechain_type sidechain;
bool debug_rpc_calls;
bool use_bitcoind_client;

View file

@ -98,7 +98,7 @@ protected:
class bitcoin_rpc_client : public bitcoin_client_base, public rpc_client {
public:
public:
bitcoin_rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls);
bitcoin_rpc_client(const std::vector<rpc_credentials> &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection);
uint64_t estimatesmartfee(uint16_t conf_target = 1);
std::vector<info_for_vin> getblock(const block_data &block, int32_t verbosity = 2);
@ -113,6 +113,8 @@ public:
std::string walletlock();
bool walletpassphrase(const std::string &passphrase, uint32_t timeout = 60);
virtual uint64_t ping(rpc_connection &conn) const override;
private:
std::string ip;
std::string user;
@ -214,14 +216,11 @@ public:
virtual optional<asset> estimate_withdrawal_transaction_fee() const override;
private:
std::string bitcoin_node_ip;
std::vector<rpc_credentials> _rpc_credentials;
std::string libbitcoin_server_ip;
uint32_t libbitcoin_block_zmq_port;
uint32_t libbitcoin_trx_zmq_port;
uint32_t bitcoin_node_zmq_port;
uint32_t rpc_port;
std::string rpc_user;
std::string rpc_password;
std::string wallet_name;
std::string wallet_password;

View file

@ -14,7 +14,7 @@ namespace graphene { namespace peerplays_sidechain {
class ethereum_rpc_client : public rpc_client {
public:
ethereum_rpc_client(const std::string &url, const std::string &user_name, const std::string &password, bool debug_rpc_calls);
ethereum_rpc_client(const std::vector<rpc_credentials> &credentials, bool debug_rpc_calls, bool simulate_connection_reselection);
std::string eth_blockNumber();
std::string eth_get_block_by_number(std::string block_number, bool full_block);
@ -36,6 +36,8 @@ public:
std::string eth_send_raw_transaction(const std::string &params);
std::string eth_get_transaction_receipt(const std::string &params);
std::string eth_get_transaction_by_hash(const std::string &params);
virtual uint64_t ping(rpc_connection &conn) const override;
};
class sidechain_net_handler_ethereum : public sidechain_net_handler {
@ -54,13 +56,9 @@ public:
virtual optional<asset> estimate_withdrawal_transaction_fee() const override;
private:
using bimap_type = boost::bimap<std::string, std::string>;
private:
std::string rpc_url;
std::string rpc_user;
std::string rpc_password;
std::vector<rpc_credentials> _rpc_credentials;
std::string wallet_contract_address;
using bimap_type = boost::bimap<std::string, std::string>;
bimap_type erc20_addresses;
ethereum_rpc_client *rpc_client;

View file

@ -13,7 +13,7 @@ namespace graphene { namespace peerplays_sidechain {
class hive_rpc_client : public rpc_client {
public:
hive_rpc_client(const std::string &url, const std::string &user_name, const std::string &password, bool debug_rpc_calls);
hive_rpc_client(const std::vector<rpc_credentials> &credentials, bool debug_rpc_calls, bool simulate_connection_reselection);
std::string account_history_api_get_transaction(std::string transaction_id);
std::string block_api_get_block(uint32_t block_number);
@ -30,6 +30,8 @@ public:
std::string get_head_block_time();
std::string get_is_test_net();
std::string get_last_irreversible_block_num();
virtual uint64_t ping(rpc_connection &conn) const override;
};
class sidechain_net_handler_hive : public sidechain_net_handler {
@ -48,9 +50,8 @@ public:
virtual optional<asset> estimate_withdrawal_transaction_fee() const override;
private:
std::string rpc_url;
std::string rpc_user;
std::string rpc_password;
std::vector<rpc_credentials> _rpc_credentials;
std::string wallet_account_name;
hive_rpc_client *rpc_client;

View file

@ -175,13 +175,14 @@ void peerplays_sidechain_plugin_impl::plugin_set_program_options(
cli.add_options()("sidechain-retry-threshold", bpo::value<uint16_t>()->default_value(150), "Sidechain retry throttling threshold");
cli.add_options()("debug-rpc-calls", bpo::value<bool>()->default_value(false), "Outputs RPC calls to console");
cli.add_options()("simulate-rpc-connection-reselection", bpo::value<bool>()->default_value(false), "Simulate RPC connection reselection by altering their response times by a random value");
cli.add_options()("bitcoin-sidechain-enabled", bpo::value<bool>()->default_value(false), "Bitcoin sidechain handler enabled");
cli.add_options()("bitcoin-node-ip", bpo::value<vector<string>>()->composing()->multitoken()->DEFAULT_VALUE_VECTOR("127.0.0.1"), "IP address of Bitcoin node");
cli.add_options()("use-bitcoind-client", bpo::value<bool>()->default_value(false), "Use bitcoind client instead of libbitcoin client");
cli.add_options()("libbitcoin-server-ip", bpo::value<string>()->default_value("127.0.0.1"), "Libbitcoin server IP address");
cli.add_options()("libbitcoin-server-block-zmq-port", bpo::value<uint32_t>()->default_value(9093), "Block ZMQ port of libbitcoin server");
cli.add_options()("libbitcoin-server-trx-zmq-port", bpo::value<uint32_t>()->default_value(9094), "Trx ZMQ port of libbitcoin server");
cli.add_options()("bitcoin-node-ip", bpo::value<string>()->default_value("127.0.0.1"), "IP address of Bitcoin node");
cli.add_options()("bitcoin-node-zmq-port", bpo::value<uint32_t>()->default_value(11111), "ZMQ port of Bitcoin node");
cli.add_options()("bitcoin-node-rpc-port", bpo::value<uint32_t>()->default_value(8332), "RPC port of Bitcoin node");
cli.add_options()("bitcoin-node-rpc-user", bpo::value<string>()->default_value("1"), "Bitcoin RPC user");
@ -192,7 +193,7 @@ void peerplays_sidechain_plugin_impl::plugin_set_program_options(
"Tuple of [Bitcoin public key, Bitcoin private key] (may specify multiple times)");
cli.add_options()("ethereum-sidechain-enabled", bpo::value<bool>()->default_value(false), "Ethereum sidechain handler enabled");
cli.add_options()("ethereum-node-rpc-url", bpo::value<string>()->default_value("127.0.0.1:8545"), "Ethereum node RPC URL [http[s]://]host[:port]");
cli.add_options()("ethereum-node-rpc-url", bpo::value<vector<string>>()->composing()->multitoken()->DEFAULT_VALUE_VECTOR("127.0.0.1:8545"), "Ethereum node RPC URL [http[s]://]host[:port]");
cli.add_options()("ethereum-node-rpc-user", bpo::value<string>(), "Ethereum RPC user");
cli.add_options()("ethereum-node-rpc-password", bpo::value<string>(), "Ethereum RPC password");
cli.add_options()("ethereum-wallet-contract-address", bpo::value<string>(), "Ethereum wallet contract address");
@ -202,7 +203,7 @@ void peerplays_sidechain_plugin_impl::plugin_set_program_options(
"Tuple of [Ethereum public key, Ethereum private key] (may specify multiple times)");
cli.add_options()("hive-sidechain-enabled", bpo::value<bool>()->default_value(false), "Hive sidechain handler enabled");
cli.add_options()("hive-node-rpc-url", bpo::value<string>()->default_value("127.0.0.1:28090"), "Hive node RPC URL [http[s]://]host[:port]");
cli.add_options()("hive-node-rpc-url", bpo::value<vector<string>>()->composing()->multitoken()->DEFAULT_VALUE_VECTOR("127.0.0.1:28090"), "Hive node RPC URL [http[s]://]host[:port]");
cli.add_options()("hive-node-rpc-user", bpo::value<string>(), "Hive node RPC user");
cli.add_options()("hive-node-rpc-password", bpo::value<string>(), "Hive node RPC password");
cli.add_options()("hive-wallet-account-name", bpo::value<string>(), "Hive wallet account name");
@ -291,6 +292,9 @@ void peerplays_sidechain_plugin_impl::plugin_initialize(const boost::program_opt
if (sidechain_enabled_peerplays && !config_ready_peerplays) {
wlog("Haven't set up Peerplays sidechain parameters");
}
if (options.at("simulate-rpc-connection-reselection").as<bool>())
ilog("### RPC connection reselection will be simulated");
}
void peerplays_sidechain_plugin_impl::plugin_startup() {

View file

@ -9,7 +9,8 @@
namespace graphene { namespace peerplays_sidechain {
sidechain_net_handler::sidechain_net_handler(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) :
sidechain_net_handler::sidechain_net_handler(sidechain_type _sidechain, peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) :
sidechain(_sidechain),
plugin(_plugin),
database(_plugin.database()) {
@ -679,7 +680,8 @@ void sidechain_net_handler::on_applied_block(const signed_block &b) {
const bool is_tracked_asset =
((sidechain == sidechain_type::bitcoin) && (transfer_op.amount.asset_id == gpo.parameters.btc_asset())) ||
((sidechain == sidechain_type::ethereum) && (transfer_op.amount.asset_id == gpo.parameters.eth_asset())) ||
(sidechain == sidechain_type::ethereum) ||
((sidechain == sidechain_type::ethereum) && (transfer_op.amount.asset_id != gpo.parameters.btc_asset())
&& (transfer_op.amount.asset_id != gpo.parameters.hbd_asset()) && (transfer_op.amount.asset_id != gpo.parameters.hive_asset())) ||
((sidechain == sidechain_type::hive) && (transfer_op.amount.asset_id == gpo.parameters.hbd_asset())) ||
((sidechain == sidechain_type::hive) && (transfer_op.amount.asset_id == gpo.parameters.hive_asset()));

View file

@ -25,8 +25,8 @@ namespace graphene { namespace peerplays_sidechain {
// =============================================================================
bitcoin_rpc_client::bitcoin_rpc_client(std::string _url, std::string _user, std::string _password, bool _debug_rpc_calls) :
rpc_client(_url, _user, _password, _debug_rpc_calls) {
bitcoin_rpc_client::bitcoin_rpc_client(const std::vector<rpc_credentials> &_credentials, bool _debug_rpc_calls, bool _simulate_connection_reselection) :
rpc_client(sidechain_type::bitcoin, _credentials, _debug_rpc_calls, _simulate_connection_reselection) {
}
uint64_t bitcoin_rpc_client::estimatesmartfee(uint16_t conf_target) {
@ -500,6 +500,13 @@ std::string bitcoin_libbitcoin_client::sendrawtransaction(const std::string &tx_
return res;
}
uint64_t bitcoin_rpc_client::ping(rpc_connection &conn) const {
std::string str = send_post_request(conn, "getblockcount", "[]", debug_rpc_calls);
if (str.length() > 0)
return std::stoll(str);
return std::numeric_limits<uint64_t>::max();
}
// =============================================================================
zmq_listener::zmq_listener(std::string _ip, uint32_t _zmq_block_port, uint32_t _zmq_trx_port) :
@ -678,12 +685,18 @@ void zmq_listener_libbitcoin::handle_block() {
// =============================================================================
sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) :
sidechain_net_handler(_plugin, options) {
sidechain = sidechain_type::bitcoin;
sidechain_net_handler(sidechain_type::bitcoin, _plugin, options) {
if (options.count("debug-rpc-calls")) {
debug_rpc_calls = options.at("debug-rpc-calls").as<bool>();
}
bool simulate_connection_reselection = options.at("simulate-rpc-connection-reselection").as<bool>();
std::vector<std::string> ips = options.at("bitcoin-node-ip").as<std::vector<std::string>>();
bitcoin_node_zmq_port = options.at("bitcoin-node-zmq-port").as<uint32_t>();
uint32_t rpc_port = options.at("bitcoin-node-rpc-port").as<uint32_t>();
std::string rpc_user = options.at("bitcoin-node-rpc-user").as<std::string>();
std::string rpc_password = options.at("bitcoin-node-rpc-password").as<std::string>();
if (options.count("use-bitcoind-client")) {
use_bitcoind_client = options.at("use-bitcoind-client").as<bool>();
@ -693,11 +706,6 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
libbitcoin_block_zmq_port = options.at("libbitcoin-server-block-zmq-port").as<uint32_t>();
libbitcoin_trx_zmq_port = options.at("libbitcoin-server-trx-zmq-port").as<uint32_t>();
bitcoin_node_ip = options.at("bitcoin-node-ip").as<std::string>();
bitcoin_node_zmq_port = options.at("bitcoin-node-zmq-port").as<uint32_t>();
rpc_port = options.at("bitcoin-node-rpc-port").as<uint32_t>();
rpc_user = options.at("bitcoin-node-rpc-user").as<std::string>();
rpc_password = options.at("bitcoin-node-rpc-password").as<std::string>();
wallet_name = "";
if (options.count("bitcoin-wallet-name")) {
wallet_name = options.at("bitcoin-wallet-name").as<std::string>();
@ -720,17 +728,27 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
}
if (use_bitcoind_client) {
std::string url = bitcoin_node_ip + ":" + std::to_string(rpc_port);
for (size_t i = 0; i < ips.size(); i++) {
std::string ip = ips[i];
std::string url = ip + ":" + std::to_string(rpc_port);
if (!wallet_name.empty()) {
url = url + "/wallet/" + wallet_name;
}
bitcoin_client = std::unique_ptr<bitcoin_rpc_client>(new bitcoin_rpc_client(url, rpc_user, rpc_password, debug_rpc_calls));
rpc_credentials creds;
creds.url = url;
creds.user = rpc_user;
creds.password = rpc_password;
_rpc_credentials.push_back(creds);
}
FC_ASSERT(!_rpc_credentials.empty());
bitcoin_client = std::unique_ptr<bitcoin_rpc_client>(new bitcoin_rpc_client(_rpc_credentials, debug_rpc_calls, simulate_connection_reselection));
if (!wallet_name.empty()) {
bitcoin_client->loadwallet(wallet_name);
}
listener = std::unique_ptr<zmq_listener>(new zmq_listener(bitcoin_node_ip, bitcoin_node_zmq_port));
listener = std::unique_ptr<zmq_listener>(new zmq_listener(ips[0], bitcoin_node_zmq_port));
} else {
bitcoin_client = std::unique_ptr<bitcoin_libbitcoin_client>(new bitcoin_libbitcoin_client(libbitcoin_server_ip));
@ -750,7 +768,6 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
bitcoin_client->getnetworkinfo();
listener->start();
listener->block_event_received.connect([this](const block_data &block_event_data) {
std::thread(&sidechain_net_handler_bitcoin::block_handle_event, this, block_event_data).detach();
});
@ -759,6 +776,8 @@ sidechain_net_handler_bitcoin::sidechain_net_handler_bitcoin(peerplays_sidechain
std::thread(&sidechain_net_handler_bitcoin::trx_handle_event, this, trx_event_data).detach();
});
listener->start();
database.changed_objects.connect([this](const vector<object_id_type> &ids, const flat_set<account_id_type> &accounts) {
on_changed_objects(ids, accounts);
});
@ -778,7 +797,7 @@ sidechain_net_handler_bitcoin::~sidechain_net_handler_bitcoin() {
bool sidechain_net_handler_bitcoin::process_proposal(const proposal_object &po) {
// ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain)));
ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain)));
bool should_approve = false;
@ -855,7 +874,7 @@ bool sidechain_net_handler_bitcoin::process_proposal(const proposal_object &po)
std::string op_tx_str = op_obj_idx_1.get<sidechain_transaction_create_operation>().transaction;
const auto &st_idx = database.get_index_type<sidechain_transaction_index>().indices().get<by_object_id>();
const auto st = st_idx.find(obj_id);
const auto st = st_idx.find(object_id);
if (st == st_idx.end()) {
std::string tx_str = "";
@ -1075,6 +1094,10 @@ void sidechain_net_handler_bitcoin::process_primary_wallet() {
return;
}
if (!plugin.can_son_participate(sidechain, chain::operation::tag<chain::son_wallet_update_operation>::value, op_id)) {
return;
}
const chain::global_property_object &gpo = database.get_global_properties();
const auto &active_sons = gpo.active_sons.at(sidechain);

View file

@ -25,8 +25,8 @@
namespace graphene { namespace peerplays_sidechain {
ethereum_rpc_client::ethereum_rpc_client(const std::string &url, const std::string &user_name, const std::string &password, bool debug_rpc_calls) :
rpc_client(url, user_name, password, debug_rpc_calls) {
ethereum_rpc_client::ethereum_rpc_client(const std::vector<rpc_credentials> &credentials, bool debug_rpc_calls, bool simulate_connection_reselection) :
rpc_client(sidechain_type::ethereum, credentials, debug_rpc_calls, simulate_connection_reselection) {
}
std::string ethereum_rpc_client::eth_blockNumber() {
@ -126,20 +126,29 @@ std::string ethereum_rpc_client::eth_get_transaction_by_hash(const std::string &
return send_post_request("eth_getTransactionByHash", "[\"" + params + "\"]", debug_rpc_calls);
}
uint64_t ethereum_rpc_client::ping(rpc_connection &conn) const {
std::string reply = send_post_request(conn, "eth_blockNumber", "", debug_rpc_calls);
if (!reply.empty())
return ethereum::from_hex<uint64_t>(retrieve_value_from_reply(reply, ""));
return std::numeric_limits<uint64_t>::max();
}
sidechain_net_handler_ethereum::sidechain_net_handler_ethereum(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) :
sidechain_net_handler(_plugin, options) {
sidechain = sidechain_type::ethereum;
sidechain_net_handler(sidechain_type::ethereum, _plugin, options) {
if (options.count("debug-rpc-calls")) {
debug_rpc_calls = options.at("debug-rpc-calls").as<bool>();
}
bool simulate_connection_reselection = options.at("simulate-rpc-connection-reselection").as<bool>();
rpc_url = options.at("ethereum-node-rpc-url").as<std::string>();
std::vector<std::string> rpc_urls = options.at("ethereum-node-rpc-url").as<std::vector<std::string>>();
std::string rpc_user;
if (options.count("ethereum-node-rpc-user")) {
rpc_user = options.at("ethereum-node-rpc-user").as<std::string>();
} else {
rpc_user = "";
}
std::string rpc_password;
if (options.count("ethereum-node-rpc-password")) {
rpc_password = options.at("ethereum-node-rpc-password").as<std::string>();
} else {
@ -175,18 +184,27 @@ sidechain_net_handler_ethereum::sidechain_net_handler_ethereum(peerplays_sidecha
}
}
rpc_client = new ethereum_rpc_client(rpc_url, rpc_user, rpc_password, debug_rpc_calls);
for (size_t i = 0; i < rpc_urls.size(); i++) {
rpc_credentials creds;
creds.url = rpc_urls[i];
creds.user = rpc_user;
creds.password = rpc_password;
_rpc_credentials.push_back(creds);
}
FC_ASSERT(!_rpc_credentials.empty());
rpc_client = new ethereum_rpc_client(_rpc_credentials, debug_rpc_calls, simulate_connection_reselection);
const std::string chain_id_str = rpc_client->get_chain_id();
if (chain_id_str.empty()) {
elog("No Ethereum node running at ${url}", ("url", rpc_url));
elog("No Ethereum node running at ${url}", ("url", _rpc_credentials[0].url));
FC_ASSERT(false);
}
chain_id = std::stoll(chain_id_str);
const std::string network_id_str = rpc_client->get_network_id();
if (network_id_str.empty()) {
elog("No Ethereum node running at ${url}", ("url", rpc_url));
elog("No Ethereum node running at ${url}", ("url", _rpc_credentials[0].url));
FC_ASSERT(false);
}
network_id = std::stoll(network_id_str);
@ -205,6 +223,7 @@ sidechain_net_handler_ethereum::~sidechain_net_handler_ethereum() {
}
bool sidechain_net_handler_ethereum::process_proposal(const proposal_object &po) {
ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain)));
bool should_approve = false;
@ -263,7 +282,7 @@ bool sidechain_net_handler_ethereum::process_proposal(const proposal_object &po)
const std::string op_tx_str = op_obj_idx_1.get<sidechain_transaction_create_operation>().transaction;
const auto &st_idx = database.get_index_type<sidechain_transaction_index>().indices().get<by_object_id>();
const auto st = st_idx.find(obj_id);
const auto st = st_idx.find(object_id);
if (st == st_idx.end()) {
std::string tx_str = "";
@ -665,13 +684,18 @@ std::string sidechain_net_handler_ethereum::send_sidechain_transaction(const sid
const ethereum::signature_encoder encoder{function_signature};
#ifdef SEND_RAW_TRANSACTION
const auto data = encoder.encode(transactions);
const std::string params = "[{\"from\":\"" + ethereum::add_0x(public_key) + "\", \"to\":\"" + wallet_contract_address + "\", \"data\":\"" + data + "\"}]";
ethereum::raw_transaction raw_tr;
raw_tr.nonce = rpc_client->get_nonce(ethereum::add_0x(public_key));
raw_tr.gas_price = rpc_client->get_gas_price();
raw_tr.gas_limit = rpc_client->get_estimate_gas(params);
if (raw_tr.gas_limit.empty())
raw_tr.gas_limit = rpc_client->get_gas_limit();
raw_tr.to = wallet_contract_address;
raw_tr.value = "";
raw_tr.data = encoder.encode(transactions);
raw_tr.data = data;
raw_tr.chain_id = ethereum::add_0x(ethereum::to_hex(chain_id));
const auto sign_tr = raw_tr.sign(get_private_key(public_key));
@ -785,7 +809,7 @@ optional<asset> sidechain_net_handler_ethereum::estimate_withdrawal_transaction_
}
const auto &public_key = son->sidechain_public_keys.at(sidechain);
const auto data = ethereum::withdrawal_encoder::encode(public_key, 1 * 10000000000, son_wallet_withdraw_id_type{0}.operator object_id_type().operator std::string());
const auto data = ethereum::withdrawal_encoder::encode(public_key, boost::multiprecision::uint256_t{1} * boost::multiprecision::uint256_t{10000000000}, "0");
const std::string params = "[{\"from\":\"" + ethereum::add_0x(public_key) + "\", \"to\":\"" + wallet_contract_address + "\", \"data\":\"" + data + "\"}]";
const auto estimate_gas = ethereum::from_hex<int64_t>(rpc_client->get_estimate_gas(params));
@ -808,14 +832,14 @@ std::string sidechain_net_handler_ethereum::create_primary_wallet_transaction(co
std::string sidechain_net_handler_ethereum::create_withdrawal_transaction(const son_wallet_withdraw_object &swwo) {
if (swwo.withdraw_currency == "ETH") {
return ethereum::withdrawal_encoder::encode(ethereum::remove_0x(swwo.withdraw_address), swwo.withdraw_amount.value * 10000000000, swwo.id.operator std::string());
return ethereum::withdrawal_encoder::encode(ethereum::remove_0x(swwo.withdraw_address), boost::multiprecision::uint256_t{swwo.withdraw_amount.value} * boost::multiprecision::uint256_t{10000000000}, swwo.id.operator std::string());
} else {
const auto it = erc20_addresses.left.find(swwo.withdraw_currency);
if (it == erc20_addresses.left.end()) {
elog("No erc-20 token: ${symbol}", ("symbol", swwo.withdraw_currency));
return "";
}
return ethereum::withdrawal_erc20_encoder::encode(ethereum::remove_0x(it->second), ethereum::remove_0x(swwo.withdraw_address), swwo.withdraw_amount.value, swwo.id.operator std::string());
return ethereum::withdrawal_erc20_encoder::encode(ethereum::remove_0x(it->second), ethereum::remove_0x(swwo.withdraw_address), boost::multiprecision::uint256_t{swwo.withdraw_amount.value}, swwo.id.operator std::string());
}
return "";
@ -890,8 +914,9 @@ void sidechain_net_handler_ethereum::handle_event(const std::string &block_numbe
const boost::property_tree::ptree tx = tx_child.second;
tx_idx = tx_idx + 1;
const std::string from = tx.get<std::string>("from");
const std::string to = tx.get<std::string>("to");
std::string from = tx.get<std::string>("from");
std::transform(from.begin(), from.end(), from.begin(), ::tolower);
std::string cmp_to = to;
std::transform(cmp_to.begin(), cmp_to.end(), cmp_to.begin(), ::toupper);

View file

@ -30,8 +30,8 @@
namespace graphene { namespace peerplays_sidechain {
hive_rpc_client::hive_rpc_client(const std::string &url, const std::string &user_name, const std::string &password, bool debug_rpc_calls) :
rpc_client(url, user_name, password, debug_rpc_calls) {
hive_rpc_client::hive_rpc_client(const std::vector<rpc_credentials> &credentials, bool debug_rpc_calls, bool simulate_connection_reselection) :
rpc_client(sidechain_type::hive, credentials, debug_rpc_calls, simulate_connection_reselection) {
}
std::string hive_rpc_client::account_history_api_get_transaction(std::string transaction_id) {
@ -112,20 +112,34 @@ std::string hive_rpc_client::get_last_irreversible_block_num() {
return retrieve_value_from_reply(reply_str, "last_irreversible_block_num");
}
uint64_t hive_rpc_client::ping(rpc_connection &conn) const {
const std::string reply = send_post_request(conn, "database_api.get_dynamic_global_properties", "", debug_rpc_calls);
if (!reply.empty()) {
std::stringstream ss(reply);
boost::property_tree::ptree json;
boost::property_tree::read_json(ss, json);
if (json.count("result"))
return json.get<uint64_t>("result.head_block_number");
}
return std::numeric_limits<uint64_t>::max();
}
sidechain_net_handler_hive::sidechain_net_handler_hive(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) :
sidechain_net_handler(_plugin, options) {
sidechain = sidechain_type::hive;
sidechain_net_handler(sidechain_type::hive, _plugin, options) {
if (options.count("debug-rpc-calls")) {
debug_rpc_calls = options.at("debug-rpc-calls").as<bool>();
}
bool simulate_connection_reselection = options.at("simulate-rpc-connection-reselection").as<bool>();
rpc_url = options.at("hive-node-rpc-url").as<std::string>();
std::vector<std::string> rpc_urls = options.at("hive-node-rpc-url").as<std::vector<std::string>>();
std::string rpc_user;
if (options.count("hive-rpc-user")) {
rpc_user = options.at("hive-rpc-user").as<std::string>();
} else {
rpc_user = "";
}
std::string rpc_password;
if (options.count("hive-rpc-password")) {
rpc_password = options.at("hive-rpc-password").as<std::string>();
} else {
@ -146,11 +160,20 @@ sidechain_net_handler_hive::sidechain_net_handler_hive(peerplays_sidechain_plugi
}
}
rpc_client = new hive_rpc_client(rpc_url, rpc_user, rpc_password, debug_rpc_calls);
for (size_t i = 0; i < rpc_urls.size(); i++) {
rpc_credentials creds;
creds.url = rpc_urls[i];
creds.user = rpc_user;
creds.password = rpc_password;
_rpc_credentials.push_back(creds);
}
FC_ASSERT(!_rpc_credentials.empty());
rpc_client = new hive_rpc_client(_rpc_credentials, debug_rpc_calls, simulate_connection_reselection);
const std::string chain_id_str = rpc_client->get_chain_id();
if (chain_id_str.empty()) {
elog("No Hive node running at ${url}", ("url", rpc_url));
elog("No Hive node running at ${url}", ("url", _rpc_credentials[0].url));
FC_ASSERT(false);
}
chain_id = chain_id_type(chain_id_str);
@ -180,7 +203,8 @@ sidechain_net_handler_hive::~sidechain_net_handler_hive() {
}
bool sidechain_net_handler_hive::process_proposal(const proposal_object &po) {
//ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain)));
ilog("Proposal to process: ${po}, SON id ${son_id}", ("po", po.id)("son_id", plugin.get_current_son_id(sidechain)));
bool should_approve = false;
@ -238,7 +262,7 @@ bool sidechain_net_handler_hive::process_proposal(const proposal_object &po) {
std::string op_tx_str = op_obj_idx_1.get<sidechain_transaction_create_operation>().transaction;
const auto &st_idx = database.get_index_type<sidechain_transaction_index>().indices().get<by_object_id>();
const auto st = st_idx.find(obj_id);
const auto st = st_idx.find(object_id);
if (st == st_idx.end()) {
std::string tx_str = "";
@ -499,6 +523,10 @@ void sidechain_net_handler_hive::process_primary_wallet() {
return;
}
if (!plugin.can_son_participate(sidechain, chain::operation::tag<chain::son_wallet_update_operation>::value, op_id)) {
return;
}
const chain::global_property_object &gpo = database.get_global_properties();
const auto &active_sons = gpo.active_sons.at(sidechain);
@ -577,7 +605,7 @@ void sidechain_net_handler_hive::process_primary_wallet() {
stc_op.object_id = op_id;
stc_op.sidechain = sidechain;
stc_op.transaction = tx_str;
for (const auto &signer : gpo.active_sons.at(sidechain)) {
for (const auto &signer : signers) {
son_info si;
si.son_id = signer.son_id;
si.weight = signer.weight;
@ -639,6 +667,11 @@ void sidechain_net_handler_hive::process_sidechain_addresses() {
}
bool sidechain_net_handler_hive::process_deposit(const son_wallet_deposit_object &swdo) {
if (proposal_exists(chain::operation::tag<chain::son_wallet_deposit_process_operation>::value, swdo.id)) {
return false;
}
const chain::global_property_object &gpo = database.get_global_properties();
price asset_price;
@ -685,6 +718,11 @@ bool sidechain_net_handler_hive::process_deposit(const son_wallet_deposit_object
}
bool sidechain_net_handler_hive::process_withdrawal(const son_wallet_withdraw_object &swwo) {
if (proposal_exists(chain::operation::tag<chain::son_wallet_withdraw_process_operation>::value, swwo.id)) {
return false;
}
const chain::global_property_object &gpo = database.get_global_properties();
//=====

View file

@ -23,8 +23,7 @@
namespace graphene { namespace peerplays_sidechain {
sidechain_net_handler_peerplays::sidechain_net_handler_peerplays(peerplays_sidechain_plugin &_plugin, const boost::program_options::variables_map &options) :
sidechain_net_handler(_plugin, options) {
sidechain = sidechain_type::peerplays;
sidechain_net_handler(sidechain_type::peerplays, _plugin, options) {
//const auto &assets_by_symbol = database.get_index_type<asset_index>().indices().get<by_symbol>();
//const auto get_asset_id = [&assets_by_symbol](const string &symbol) {
// auto asset_itr = assets_by_symbol.find(symbol);

View file

@ -2773,12 +2773,21 @@ public:
FC_ASSERT(son_obj, "Account ${son} is not registered as a son", ("son", son));
FC_ASSERT(sidechain == sidechain_type::bitcoin || sidechain == sidechain_type::hive || sidechain == sidechain_type::ethereum, "Unexpected sidechain type");
bool update_vote_time = false;
if (approve)
{
FC_ASSERT(son_obj->get_sidechain_vote_id(sidechain).valid(), "Invalid vote id, sidechain: ${sidechain}, son: ${son}", ("sidechain", sidechain)("son", *son_obj));
account_id_type stake_account = get_account_id(voting_account);
const auto gpos_info = _remote_db->get_gpos_info(stake_account);
const auto vesting_subperiod = _remote_db->get_global_properties().parameters.gpos_subperiod();
const auto gpos_start_time = fc::time_point_sec(_remote_db->get_global_properties().parameters.gpos_period_start());
const auto subperiod_start_time = gpos_start_time.sec_since_epoch() + (gpos_info.current_subperiod - 1) * vesting_subperiod;
auto insert_result = voting_account_object.options.votes.insert(*son_obj->get_sidechain_vote_id(sidechain));
if (!insert_result.second)
FC_THROW("Account ${account} has already voted for son ${son} for sidechain ${sidechain}", ("account", voting_account)("son", son)("sidechain", sidechain));
if (!insert_result.second && (gpos_info.last_voted_time.sec_since_epoch() >= subperiod_start_time))
FC_THROW("Account ${account} was already voting for son ${son} in the current GPOS sub-period", ("account", voting_account)("son", son));
else
update_vote_time = true; //Allow user to vote in each sub-period(Update voting time, which is reference in calculating VF)
}
else
{
@ -2787,9 +2796,11 @@ public:
if (!votes_removed)
FC_THROW("Account ${account} has already unvoted for son ${son} for sidechain ${sidechain}", ("account", voting_account)("son", son)("sidechain", sidechain));
}
account_update_operation account_update_op;
account_update_op.account = voting_account_object.id;
account_update_op.new_options = voting_account_object.options;
account_update_op.extensions.value.update_last_voting_time = update_vote_time;
signed_transaction tx;
tx.operations.push_back( account_update_op );

View file

@ -740,6 +740,19 @@ BOOST_AUTO_TEST_CASE( update_son_votes_test )
sidechain_type::ethereum, 0, true);
BOOST_CHECK(generate_maintenance_block());
// Vote for less SONs than num_son (2 votes, but num_son is 3)
accepted.clear();
rejected.clear();
accepted.push_back("son1account");
accepted.push_back("son2account");
BOOST_CHECK_THROW(update_votes_tx = con.wallet_api_ptr->update_son_votes("nathan", accepted, rejected,
sidechain_type::bitcoin, 3, true), fc::exception);
BOOST_CHECK_THROW(update_votes_tx = con.wallet_api_ptr->update_son_votes("nathan", accepted, rejected,
sidechain_type::hive, 3, true), fc::exception);
BOOST_CHECK_THROW(update_votes_tx = con.wallet_api_ptr->update_son_votes("nathan", accepted, rejected,
sidechain_type::ethereum, 3, true), fc::exception);
generate_block();
// Verify the votes
son1_obj = con.wallet_api_ptr->get_son("son1account");
son1_end_votes = son1_obj.total_votes;

View file

@ -193,6 +193,34 @@ BOOST_AUTO_TEST_CASE(tickets_purchase_fail_test)
}
}
BOOST_AUTO_TEST_CASE(tickets_purchase_overflow)
{
try
{
nft_metadata_id_type test_nft_md_id = db.get_index<nft_metadata_object>().get_next_id();
INVOKE(create_lottery_nft_md_test);
auto &test_nft_md_obj = test_nft_md_id(db);
nft_lottery_token_purchase_operation tpo;
tpo.fee = asset();
tpo.buyer = account_id_type();
tpo.lottery_id = test_nft_md_obj.id;
tpo.tickets_to_buy = 9223372036854775800; // Large number so that the overall amount overflows
trx.operations.push_back(tpo);
BOOST_REQUIRE_THROW(PUSH_TX(db, trx, ~0), fc::overflow_exception);
trx.operations.clear();
tpo.tickets_to_buy = -2; // Negative value should also be rejected
trx.operations.push_back(tpo);
BOOST_REQUIRE_THROW(PUSH_TX(db, trx, ~0), fc::exception);
}
catch (fc::exception &e)
{
edump((e.to_detail_string()));
throw;
}
}
BOOST_AUTO_TEST_CASE(lottery_end_by_stage_test)
{
try