Restore much of the syncing code from BitShares 0.x, and add optimizations that prevent us from attempting to sync with peers those blockchain diverged from ours many blocks ago, which also reduces the size of sync messages. Add logging.
This commit is contained in:
parent
b2c23cd37b
commit
b6bba74301
7 changed files with 278 additions and 94 deletions
|
|
@ -47,6 +47,8 @@
|
|||
#include <fc/log/logger.hpp>
|
||||
#include <fc/log/logger_config.hpp>
|
||||
|
||||
#include <boost/range/adaptor/reversed.hpp>
|
||||
|
||||
namespace graphene { namespace app {
|
||||
using net::item_hash_t;
|
||||
using net::item_id;
|
||||
|
|
@ -398,6 +400,13 @@ namespace detail {
|
|||
FC_THROW( "Invalid Message Type" );
|
||||
}
|
||||
|
||||
bool is_included_block(const block_id_type& block_id)
|
||||
{
|
||||
uint32_t block_num = block_header::num_from_id(block_id);
|
||||
block_id_type block_id_in_preferred_chain = _chain_db->get_block_id_for_num(block_num);
|
||||
return block_id == block_id_in_preferred_chain;
|
||||
}
|
||||
|
||||
/**
|
||||
* Assuming all data elements are ordered in some way, this method should
|
||||
* return up to limit ids that occur *after* the last ID in synopsis that
|
||||
|
|
@ -407,12 +416,10 @@ namespace detail {
|
|||
* in our blockchain after the last item returned in the result,
|
||||
* or 0 if the result contains the last item in the blockchain
|
||||
*/
|
||||
virtual std::vector<item_hash_t> get_item_ids(uint32_t item_type,
|
||||
const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit) override
|
||||
virtual std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit) override
|
||||
{ try {
|
||||
FC_ASSERT( item_type == graphene::net::block_message_type );
|
||||
vector<block_id_type> result;
|
||||
remaining_item_count = 0;
|
||||
if( _chain_db->head_block_num() == 0 )
|
||||
|
|
@ -420,18 +427,16 @@ namespace detail {
|
|||
|
||||
result.reserve(limit);
|
||||
block_id_type last_known_block_id;
|
||||
auto itr = blockchain_synopsis.rbegin();
|
||||
while( itr != blockchain_synopsis.rend() )
|
||||
{
|
||||
if( _chain_db->is_known_block(*itr) || *itr == block_id_type() )
|
||||
{
|
||||
last_known_block_id = *itr;
|
||||
break;
|
||||
}
|
||||
++itr;
|
||||
}
|
||||
|
||||
for( auto num = block_header::num_from_id(last_known_block_id);
|
||||
for (const item_hash_t& block_id_in_synopsis : boost::adaptors::reverse(blockchain_synopsis))
|
||||
if (block_id_in_synopsis == block_id_type() ||
|
||||
(_chain_db->is_known_block(block_id_in_synopsis) && is_included_block(block_id_in_synopsis)))
|
||||
{
|
||||
last_known_block_id = block_id_in_synopsis;
|
||||
break;
|
||||
}
|
||||
|
||||
for( uint32_t num = block_header::num_from_id(last_known_block_id);
|
||||
num <= _chain_db->head_block_num() && result.size() < limit;
|
||||
++num )
|
||||
if( num > 0 )
|
||||
|
|
@ -468,38 +473,156 @@ namespace detail {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a synopsis of the blockchain used for syncing.
|
||||
* This consists of a list of selected item hashes from our current preferred
|
||||
* blockchain, exponentially falling off into the past. Horrible explanation.
|
||||
* Returns a synopsis of the blockchain used for syncing. This consists of a list of
|
||||
* block hashes at intervals exponentially increasing towards the genesis block.
|
||||
* When syncing to a peer, the peer uses this data to determine if we're on the same
|
||||
* fork as they are, and if not, what blocks they need to send us to get us on their
|
||||
* fork.
|
||||
*
|
||||
* If the blockchain is empty, it will return the empty list.
|
||||
* If the blockchain has one block, it will return a list containing just that block.
|
||||
* If it contains more than one block:
|
||||
* the first element in the list will be the hash of the genesis block
|
||||
* the second element will be the hash of an item at the half way point in the blockchain
|
||||
* the third will be ~3/4 of the way through the block chain
|
||||
* the fourth will be at ~7/8...
|
||||
* &c.
|
||||
* the last item in the list will be the hash of the most recent block on our preferred chain
|
||||
* In the over-simplified case, this is a straighforward synopsis of our current
|
||||
* preferred blockchain; when we first connect up to a peer, this is what we will be sending.
|
||||
* It looks like this:
|
||||
* If the blockchain is empty, it will return the empty list.
|
||||
* If the blockchain has one block, it will return a list containing just that block.
|
||||
* If it contains more than one block:
|
||||
* the first element in the list will be the hash of the highest numbered block that
|
||||
* we cannot undo
|
||||
* the second element will be the hash of an item at the half way point in the undoable
|
||||
* segment of the blockchain
|
||||
* the third will be ~3/4 of the way through the undoable segment of the block chain
|
||||
* the fourth will be at ~7/8...
|
||||
* &c.
|
||||
* the last item in the list will be the hash of the most recent block on our preferred chain
|
||||
* so if the blockchain had 26 blocks labeled a - z, the synopsis would be:
|
||||
* a n u x z
|
||||
* the idea being that by sending a small (<30) number of block ids, we can summarize a huge
|
||||
* blockchain. The block ids are more dense near the end of the chain where because we are
|
||||
* more likely to be almost in sync when we first connect, and forks are likely to be short.
|
||||
* If the peer we're syncing with in our example is on a fork that started at block 'v',
|
||||
* then they will reply to our synopsis with a list of all blocks starting from block 'u',
|
||||
* the last block they know that we had in common.
|
||||
*
|
||||
* In the real code, there are several complications.
|
||||
*
|
||||
* First, as an optimization, we don't usually send a synopsis of the entire blockchain, we
|
||||
* send a synopsis of only the segment of the blockchain that we have undo data for. If their
|
||||
* fork doesn't build off of something in our undo history, we would be unable to switch, so there's
|
||||
* no reason to fetch the blocks.
|
||||
*
|
||||
* Second, when a peer replies to our initial synopsis and gives us a list of the blocks they think
|
||||
* we are missing, they only send a chunk of a few thousand blocks at once. After we get those
|
||||
* block ids, we need to request more blocks by sending another synopsis (we can't just say "send me
|
||||
* the next 2000 ids" because they may have switched forks themselves and they don't track what
|
||||
* they've sent us). For faster performance, we want to get a fairly long list of block ids first,
|
||||
* then start downloading the blocks.
|
||||
* The peer doesn't handle these follow-up block id requests any different from the initial request;
|
||||
* it treats the synopsis we send as our blockchain and bases its response entirely off that. So to
|
||||
* get the response we want (the next chunk of block ids following the last one they sent us, or,
|
||||
* failing that, the shortest fork off of the last list of block ids they sent), we need to construct
|
||||
* a synopsis as if our blockchain was made up of:
|
||||
* 1. the blocks in our block chain up to the fork point (if there is a fork) or the head block (if no fork)
|
||||
* 2. the blocks we've already pushed from their fork (if there's a fork)
|
||||
* 3. the block ids they've previously sent us
|
||||
* Segment 3 is handled in the p2p code, it just tells us the number of blocks it has (in
|
||||
* number_of_blocks_after_reference_point) so we can leave space in the synopsis for them.
|
||||
* We're responsible for constructing the synopsis of Segments 1 and 2 from our active blockchain and
|
||||
* fork database. The reference_point parameter is the last block from that peer that has been
|
||||
* successfully pushed to the blockchain, so that tells us whether the peer is on a fork or on
|
||||
* the main chain.
|
||||
*/
|
||||
virtual std::vector<item_hash_t> get_blockchain_synopsis(uint32_t item_type,
|
||||
const graphene::net::item_hash_t& reference_point,
|
||||
virtual std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
|
||||
uint32_t number_of_blocks_after_reference_point) override
|
||||
{ try {
|
||||
std::vector<item_hash_t> result;
|
||||
result.reserve(30);
|
||||
uint32_t head_block_num = _chain_db->head_block_num();
|
||||
result.push_back(_chain_db->head_block_id());
|
||||
uint32_t current = 1;
|
||||
while( current < head_block_num )
|
||||
{
|
||||
result.push_back(_chain_db->get_block_id_for_num(head_block_num - current));
|
||||
current = current*2;
|
||||
}
|
||||
std::reverse( result.begin(), result.end() );
|
||||
idump((reference_point)(number_of_blocks_after_reference_point)(result));
|
||||
return result;
|
||||
} FC_CAPTURE_AND_RETHROW( (reference_point)(number_of_blocks_after_reference_point) ) }
|
||||
std::vector<item_hash_t> synopsis;
|
||||
synopsis.reserve(30);
|
||||
uint32_t high_block_num;
|
||||
uint32_t non_fork_high_block_num;
|
||||
uint32_t low_block_num = _chain_db->last_non_undoable_block_num();
|
||||
std::vector<block_id_type> fork_history;
|
||||
|
||||
if (reference_point != item_hash_t())
|
||||
{
|
||||
// the node is asking for a summary of the block chain up to a specified
|
||||
// block, which may or may not be on a fork
|
||||
// for now, assume it's not on a fork
|
||||
if (is_included_block(reference_point))
|
||||
{
|
||||
// reference_point is a block we know about and is on the main chain
|
||||
uint32_t reference_point_block_num = block_header::num_from_id(reference_point);
|
||||
assert(reference_point_block_num > 0);
|
||||
high_block_num = reference_point_block_num;
|
||||
non_fork_high_block_num = high_block_num;
|
||||
}
|
||||
else
|
||||
{
|
||||
// block is a block we know about, but it is on a fork
|
||||
try
|
||||
{
|
||||
std::vector<block_id_type> fork_history = _chain_db->get_block_ids_on_fork(reference_point);
|
||||
// returns a vector where the first element is the common ancestor with the preferred chain,
|
||||
// and the last element is the reference point you passed in
|
||||
assert(fork_history.size() >= 2);
|
||||
assert(fork_history.back() == reference_point);
|
||||
block_id_type last_non_fork_block = fork_history.front();
|
||||
fork_history.erase(fork_history.begin()); // remove the common ancestor
|
||||
|
||||
if (last_non_fork_block == block_id_type()) // if the fork goes all the way back to genesis (does graphene's fork db allow this?)
|
||||
non_fork_high_block_num = 0;
|
||||
else
|
||||
non_fork_high_block_num = block_header::num_from_id(last_non_fork_block);
|
||||
|
||||
high_block_num = non_fork_high_block_num + fork_history.size();
|
||||
assert(high_block_num == block_header::num_from_id(fork_history.back()));
|
||||
}
|
||||
catch (const fc::exception& e)
|
||||
{
|
||||
// unable to get fork history for some reason. maybe not linked?
|
||||
// we can't return a synopsis of its chain
|
||||
elog("Unable to construct a blockchain synopsis for reference hash ${hash}: ${exception}", ("hash", reference_point)("exception", e));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// no reference point specified, summarize the whole block chain
|
||||
high_block_num = _chain_db->head_block_num();
|
||||
non_fork_high_block_num = high_block_num;
|
||||
if (high_block_num == 0)
|
||||
return synopsis; // we have no blocks
|
||||
}
|
||||
|
||||
// at this point:
|
||||
// low_block_num is the block before the first block we can undo,
|
||||
// non_fork_high_block_num is the block before the fork (if the peer is on a fork, or otherwise it is the same as high_block_num)
|
||||
// high_block_num is the block number of the reference block, or the end of the chain if no reference provided
|
||||
|
||||
if (non_fork_high_block_num <= low_block_num)
|
||||
{
|
||||
wlog("Unable to generate a usable synopsis because the peer we're generating it for forked too long ago "
|
||||
"(our chains diverge after block #${non_fork_high_block_num} but only undoable to block #${low_block_num})",
|
||||
("low_block_num", low_block_num)
|
||||
("non_fork_high_block_num", non_fork_high_block_num));
|
||||
return synopsis;
|
||||
}
|
||||
|
||||
uint32_t true_high_block_num = high_block_num + number_of_blocks_after_reference_point;
|
||||
do
|
||||
{
|
||||
// for each block in the synopsis, figure out where to pull the block id from.
|
||||
// if it's <= non_fork_high_block_num, we grab it from the main blockchain;
|
||||
// if it's not, we pull it from the fork history
|
||||
if (low_block_num <= non_fork_high_block_num)
|
||||
synopsis.push_back(_chain_db->get_block_id_for_num(low_block_num));
|
||||
else
|
||||
synopsis.push_back(fork_history[low_block_num - non_fork_high_block_num - 1]);
|
||||
low_block_num += (true_high_block_num - low_block_num + 2) / 2;
|
||||
}
|
||||
while (low_block_num <= high_block_num);
|
||||
|
||||
idump((synopsis));
|
||||
return synopsis;
|
||||
} FC_CAPTURE_AND_RETHROW() }
|
||||
|
||||
/**
|
||||
* Call this after the call to handle_message succeeds.
|
||||
|
|
|
|||
|
|
@ -75,6 +75,16 @@ const signed_transaction& database::get_recent_transaction(const transaction_id_
|
|||
return itr->trx;
|
||||
}
|
||||
|
||||
std::vector<block_id_type> database::get_block_ids_on_fork(block_id_type head_of_fork) const
|
||||
{
|
||||
pair<fork_database::branch_type, fork_database::branch_type> branches = _fork_db.fetch_branch_from(head_block_id(), head_of_fork);
|
||||
assert(branches.first.back()->id == branches.second.back()->id);
|
||||
std::vector<block_id_type> result;
|
||||
for (const item_ptr& fork_block : branches.second)
|
||||
result.emplace_back(fork_block->id);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Push block "may fail" in which case every partial change is unwound. After
|
||||
* push block is successful the block is appended to the chain database on disk.
|
||||
|
|
|
|||
|
|
@ -84,4 +84,10 @@ node_property_object& database::node_properties()
|
|||
return _node_property_object;
|
||||
}
|
||||
|
||||
uint32_t database::last_non_undoable_block_num() const
|
||||
{
|
||||
return head_block_num() - _undo_db.size();
|
||||
}
|
||||
|
||||
|
||||
} }
|
||||
|
|
|
|||
|
|
@ -137,6 +137,7 @@ namespace graphene { namespace chain {
|
|||
optional<signed_block> fetch_block_by_id( const block_id_type& id )const;
|
||||
optional<signed_block> fetch_block_by_number( uint32_t num )const;
|
||||
const signed_transaction& get_recent_transaction( const transaction_id_type& trx_id )const;
|
||||
std::vector<block_id_type> get_block_ids_on_fork(block_id_type head_of_fork) const;
|
||||
|
||||
/**
|
||||
* Calculate the percent of block production slots that were missed in the
|
||||
|
|
@ -280,7 +281,8 @@ namespace graphene { namespace chain {
|
|||
callback();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
uint32_t last_non_undoable_block_num() const;
|
||||
//////////////////// db_init.cpp ////////////////////
|
||||
|
||||
void initialize_evaluators();
|
||||
|
|
|
|||
|
|
@ -99,10 +99,9 @@ namespace graphene { namespace net {
|
|||
* in our blockchain after the last item returned in the result,
|
||||
* or 0 if the result contains the last item in the blockchain
|
||||
*/
|
||||
virtual std::vector<item_hash_t> get_item_ids(uint32_t item_type,
|
||||
const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit = 2000) = 0;
|
||||
virtual std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit = 2000) = 0;
|
||||
|
||||
/**
|
||||
* Given the hash of the requested data, fetch the body.
|
||||
|
|
@ -119,14 +118,17 @@ namespace graphene { namespace net {
|
|||
* If the blockchain is empty, it will return the empty list.
|
||||
* If the blockchain has one block, it will return a list containing just that block.
|
||||
* If it contains more than one block:
|
||||
* the first element in the list will be the hash of the genesis block
|
||||
* the second element will be the hash of an item at the half way point in the blockchain
|
||||
* the third will be ~3/4 of the way through the block chain
|
||||
* the first element in the list will be the hash of the highest numbered block that
|
||||
* we cannot undo
|
||||
* the second element will be the hash of an item at the half way point in the undoable
|
||||
* segment of the blockchain
|
||||
* the third will be ~3/4 of the way through the undoable segment of the block chain
|
||||
* the fourth will be at ~7/8...
|
||||
* &c.
|
||||
* the last item in the list will be the hash of the most recent block on our preferred chain
|
||||
*/
|
||||
virtual std::vector<item_hash_t> get_blockchain_synopsis(uint32_t item_type, const graphene::net::item_hash_t& reference_point = graphene::net::item_hash_t(), uint32_t number_of_blocks_after_reference_point = 0) = 0;
|
||||
virtual std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
|
||||
uint32_t number_of_blocks_after_reference_point) = 0;
|
||||
|
||||
/**
|
||||
* Call this after the call to handle_message succeeds.
|
||||
|
|
|
|||
|
|
@ -219,7 +219,7 @@ namespace graphene { namespace net
|
|||
uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
|
||||
bool peer_needs_sync_items_from_us;
|
||||
bool we_need_sync_items_from_peer;
|
||||
fc::optional<boost::tuple<item_id, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
|
||||
fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
|
||||
item_to_time_map_type sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
|
||||
uint32_t last_block_number_delegate_has_seen; /// the number of the last block this peer has told us about that the delegate knows (ids_of_items_to_get[0] should be the id of block [this value + 1])
|
||||
item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@
|
|||
#include <boost/multi_index/hashed_index.hpp>
|
||||
#include <boost/logic/tribool.hpp>
|
||||
#include <boost/range/algorithm_ext/push_back.hpp>
|
||||
#include <boost/range/algorithm/find.hpp>
|
||||
#include <boost/range/numeric.hpp>
|
||||
|
||||
#include <boost/accumulators/accumulators.hpp>
|
||||
|
|
@ -279,7 +280,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
(handle_message) \
|
||||
(handle_block) \
|
||||
(handle_transaction) \
|
||||
(get_item_ids) \
|
||||
(get_block_ids) \
|
||||
(get_item) \
|
||||
(get_chain_id) \
|
||||
(get_blockchain_synopsis) \
|
||||
|
|
@ -375,15 +376,13 @@ namespace graphene { namespace net { namespace detail {
|
|||
void handle_message( const message& ) override;
|
||||
bool handle_block( const graphene::net::block_message& block_message, bool sync_mode, std::vector<fc::uint160_t>& contained_transaction_message_ids ) override;
|
||||
void handle_transaction( const graphene::net::trx_message& transaction_message ) override;
|
||||
std::vector<item_hash_t> get_item_ids(uint32_t item_type,
|
||||
const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit = 2000) override;
|
||||
std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit = 2000) override;
|
||||
message get_item( const item_id& id ) override;
|
||||
chain_id_type get_chain_id() const override;
|
||||
std::vector<item_hash_t> get_blockchain_synopsis(uint32_t item_type,
|
||||
const graphene::net::item_hash_t& reference_point = graphene::net::item_hash_t(),
|
||||
uint32_t number_of_blocks_after_reference_point = 0) override;
|
||||
std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
|
||||
uint32_t number_of_blocks_after_reference_point) override;
|
||||
void sync_status( uint32_t item_type, uint32_t item_count ) override;
|
||||
void connection_count_changed( uint32_t c ) override;
|
||||
uint32_t get_block_number(const item_hash_t& block_id) override;
|
||||
|
|
@ -1324,7 +1323,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
{
|
||||
wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${id}",
|
||||
("peer", active_peer->get_remote_endpoint())
|
||||
("id", active_peer->item_ids_requested_from_peer->get<0>().item_hash));
|
||||
("id", active_peer->item_ids_requested_from_peer->get<0>().back()));
|
||||
disconnect_due_to_request_timeout = true;
|
||||
}
|
||||
if (!disconnect_due_to_request_timeout)
|
||||
|
|
@ -2155,9 +2154,8 @@ namespace graphene { namespace net { namespace detail {
|
|||
( "synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis ) );
|
||||
|
||||
blockchain_item_ids_inventory_message reply_message;
|
||||
reply_message.item_hashes_available = _delegate->get_item_ids( fetch_blockchain_item_ids_message_received.item_type,
|
||||
fetch_blockchain_item_ids_message_received.blockchain_synopsis,
|
||||
reply_message.total_remaining_item_count );
|
||||
reply_message.item_hashes_available = _delegate->get_block_ids(fetch_blockchain_item_ids_message_received.blockchain_synopsis,
|
||||
reply_message.total_remaining_item_count );
|
||||
reply_message.item_type = fetch_blockchain_item_ids_message_received.item_type;
|
||||
|
||||
bool disconnect_from_inhibited_peer = false;
|
||||
|
|
@ -2170,8 +2168,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
reply_message.item_hashes_available.size() == 1 &&
|
||||
std::find(fetch_blockchain_item_ids_message_received.blockchain_synopsis.begin(),
|
||||
fetch_blockchain_item_ids_message_received.blockchain_synopsis.end(),
|
||||
reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end()
|
||||
)
|
||||
reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end() )
|
||||
{
|
||||
/* the last item in the peer's list matches the last item in our list */
|
||||
originating_peer->peer_needs_sync_items_from_us = false;
|
||||
|
|
@ -2270,24 +2267,33 @@ namespace graphene { namespace net { namespace detail {
|
|||
// This is pretty expensive, we should find a better way to do this
|
||||
std::unique_ptr<std::vector<item_hash_t> > original_ids_of_items_to_get(new std::vector<item_hash_t>(peer->ids_of_items_to_get.begin(), peer->ids_of_items_to_get.end()));
|
||||
|
||||
std::vector<item_hash_t> synopsis = _delegate->get_blockchain_synopsis(_sync_item_type, reference_point, number_of_blocks_after_reference_point);
|
||||
assert(reference_point == item_hash_t() || !synopsis.empty());
|
||||
std::vector<item_hash_t> synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
|
||||
|
||||
// if we passed in a reference point, we believe it is one the client has already accepted and should
|
||||
// be able to generate a synopsis based on it
|
||||
if( reference_point != item_hash_t() && synopsis.empty() )
|
||||
synopsis = _delegate->get_blockchain_synopsis( _sync_item_type, reference_point, number_of_blocks_after_reference_point );
|
||||
// just for debugging, enable this and set a breakpoint to step through
|
||||
if (synopsis.empty())
|
||||
synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
|
||||
|
||||
// TODO: it's possible that the returned synopsis is empty if the blockchain is empty (that's fine)
|
||||
// or if the reference point is now past our undo history (that's not).
|
||||
// in the second case, we should mark this peer as one we're unable to sync with and
|
||||
// disconnect them.
|
||||
if (reference_point != item_hash_t() && synopsis.empty())
|
||||
FC_THROW_EXCEPTION(block_older_than_undo_history, "You are on a fork I'm unable to switch to");
|
||||
|
||||
if( number_of_blocks_after_reference_point )
|
||||
{
|
||||
// then the synopsis is incomplete, add the missing elements from ids_of_items_to_get
|
||||
uint32_t true_high_block_num = reference_point_block_num + number_of_blocks_after_reference_point;
|
||||
uint32_t low_block_num = 1;
|
||||
|
||||
// in order to generate a seamless synopsis, we need to be using the same low_block_num as the
|
||||
// backend code; the first block in the synopsis will be the low block number it used
|
||||
uint32_t low_block_num = synopsis.empty() ? 1 : _delegate->get_block_number(synopsis.front());
|
||||
|
||||
do
|
||||
{
|
||||
if( low_block_num > reference_point_block_num )
|
||||
synopsis.push_back( (*original_ids_of_items_to_get)[low_block_num - reference_point_block_num - 1] );
|
||||
low_block_num += ( (true_high_block_num - low_block_num + 2 ) / 2 );
|
||||
synopsis.push_back((*original_ids_of_items_to_get)[low_block_num - reference_point_block_num - 1]);
|
||||
low_block_num += (true_high_block_num - low_block_num + 2 ) / 2;
|
||||
}
|
||||
while ( low_block_num <= true_high_block_num );
|
||||
assert(synopsis.back() == original_ids_of_items_to_get->back());
|
||||
|
|
@ -2305,14 +2311,25 @@ namespace graphene { namespace net { namespace detail {
|
|||
peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t());
|
||||
}
|
||||
|
||||
std::vector<item_hash_t> blockchain_synopsis = create_blockchain_synopsis_for_peer( peer );
|
||||
item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back();
|
||||
dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, (full request is ${blockchain_synopsis})",
|
||||
( "last_item_seen", last_item_seen )
|
||||
( "peer", peer->get_remote_endpoint() )
|
||||
( "blockchain_synopsis", blockchain_synopsis ) );
|
||||
peer->item_ids_requested_from_peer = boost::make_tuple( item_id(_sync_item_type, last_item_seen ), fc::time_point::now() );
|
||||
peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) );
|
||||
fc::oexception synopsis_exception;
|
||||
try
|
||||
{
|
||||
std::vector<item_hash_t> blockchain_synopsis = create_blockchain_synopsis_for_peer( peer );
|
||||
|
||||
item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back();
|
||||
dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, (full request is ${blockchain_synopsis})",
|
||||
( "last_item_seen", last_item_seen )
|
||||
( "peer", peer->get_remote_endpoint() )
|
||||
( "blockchain_synopsis", blockchain_synopsis ) );
|
||||
peer->item_ids_requested_from_peer = boost::make_tuple( blockchain_synopsis, fc::time_point::now() );
|
||||
peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) );
|
||||
}
|
||||
catch (const block_older_than_undo_history& e)
|
||||
{
|
||||
synopsis_exception = e;
|
||||
}
|
||||
if (synopsis_exception)
|
||||
disconnect_from_peer(peer, "You are on a fork I'm unable to switch to");
|
||||
}
|
||||
|
||||
void node_impl::on_blockchain_item_ids_inventory_message(peer_connection* originating_peer,
|
||||
|
|
@ -2322,6 +2339,33 @@ namespace graphene { namespace net { namespace detail {
|
|||
// ignore unless we asked for the data
|
||||
if( originating_peer->item_ids_requested_from_peer )
|
||||
{
|
||||
// verify that the peer's the block ids the peer sent is a valid response to our request;
|
||||
// It should either be an empty list of blocks, or a list of blocks that builds off of one of
|
||||
// the blocks in the synopsis we sent
|
||||
if (!blockchain_item_ids_inventory_message_received.item_hashes_available.empty())
|
||||
{
|
||||
const std::vector<item_hash_t>& synopsis_sent_in_request = originating_peer->item_ids_requested_from_peer->get<0>();
|
||||
const item_hash_t& first_item_hash = blockchain_item_ids_inventory_message_received.item_hashes_available.front();
|
||||
if (boost::range::find(synopsis_sent_in_request, first_item_hash) == synopsis_sent_in_request.end())
|
||||
{
|
||||
wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks based on the synopsis ${synopsis}, but they "
|
||||
"provided a list of blocks starting with ${first_block}",
|
||||
("peer_endpoint", originating_peer->get_remote_endpoint())
|
||||
("synopsis", synopsis_sent_in_request)
|
||||
("first_block", first_item_hash));
|
||||
// TODO: enable these once committed
|
||||
//fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks following something in "
|
||||
// "${synopsis}, but you returned a list of blocks starting with ${first_block} which wasn't one of your choices",
|
||||
// ("synopsis", synopsis_sent_in_request)
|
||||
// ("first_block", first_item_hash)));
|
||||
//disconnect_from_peer(originating_peer,
|
||||
// "You gave an invalid response to my request for sync blocks",
|
||||
// true, error_for_peer);
|
||||
disconnect_from_peer(originating_peer,
|
||||
"You gave an invalid response to my request for sync blocks");
|
||||
return;
|
||||
}
|
||||
}
|
||||
originating_peer->item_ids_requested_from_peer.reset();
|
||||
|
||||
dlog( "sync: received a list of ${count} available items from ${peer_endpoint}",
|
||||
|
|
@ -5185,12 +5229,11 @@ namespace graphene { namespace net { namespace detail {
|
|||
INVOKE_AND_COLLECT_STATISTICS(handle_transaction, transaction_message);
|
||||
}
|
||||
|
||||
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_item_ids(uint32_t item_type,
|
||||
const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit /* = 2000 */)
|
||||
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit /* = 2000 */)
|
||||
{
|
||||
INVOKE_AND_COLLECT_STATISTICS(get_item_ids, item_type, blockchain_synopsis, remaining_item_count, limit);
|
||||
INVOKE_AND_COLLECT_STATISTICS(get_block_ids, blockchain_synopsis, remaining_item_count, limit);
|
||||
}
|
||||
|
||||
message statistics_gathering_node_delegate_wrapper::get_item( const item_id& id )
|
||||
|
|
@ -5203,11 +5246,9 @@ namespace graphene { namespace net { namespace detail {
|
|||
INVOKE_AND_COLLECT_STATISTICS(get_chain_id);
|
||||
}
|
||||
|
||||
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(uint32_t item_type,
|
||||
const graphene::net::item_hash_t& reference_point /* = graphene::net::item_hash_t() */,
|
||||
uint32_t number_of_blocks_after_reference_point /* = 0 */)
|
||||
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(const item_hash_t& reference_point, uint32_t number_of_blocks_after_reference_point)
|
||||
{
|
||||
INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, item_type, reference_point, number_of_blocks_after_reference_point);
|
||||
INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, reference_point, number_of_blocks_after_reference_point);
|
||||
}
|
||||
|
||||
void statistics_gathering_node_delegate_wrapper::sync_status( uint32_t item_type, uint32_t item_count )
|
||||
|
|
|
|||
Loading…
Reference in a new issue