Merge branch 'p2p_sync_fixes'
This commit is contained in:
commit
1202c0258d
10 changed files with 396 additions and 121 deletions
|
|
@ -27,6 +27,7 @@
|
|||
#include <graphene/egenesis/egenesis.hpp>
|
||||
|
||||
#include <graphene/net/core_messages.hpp>
|
||||
#include <graphene/net/exceptions.hpp>
|
||||
|
||||
#include <graphene/time/time.hpp>
|
||||
|
||||
|
|
@ -48,6 +49,8 @@
|
|||
#include <fc/log/logger.hpp>
|
||||
#include <fc/log/logger_config.hpp>
|
||||
|
||||
#include <boost/range/adaptor/reversed.hpp>
|
||||
|
||||
namespace graphene { namespace app {
|
||||
using net::item_hash_t;
|
||||
using net::item_id;
|
||||
|
|
@ -380,6 +383,10 @@ namespace detail {
|
|||
ilog("Got block #${n} from network", ("n", blk_msg.block.block_num()));
|
||||
|
||||
try {
|
||||
// TODO: in the case where this block is valid but on a fork that's too old for us to switch to,
|
||||
// you can help the network code out by throwing a block_older_than_undo_history exception.
|
||||
// when the net code sees that, it will stop trying to push blocks from that chain, but
|
||||
// leave that peer connected so that they can get sync blocks from us
|
||||
bool result = _chain_db->push_block(blk_msg.block, _is_block_producer ? database::skip_nothing : database::skip_transaction_signatures);
|
||||
|
||||
// the block was accepted, so we now know all of the transactions contained in the block
|
||||
|
|
@ -422,6 +429,13 @@ namespace detail {
|
|||
FC_THROW( "Invalid Message Type" );
|
||||
}
|
||||
|
||||
bool is_included_block(const block_id_type& block_id)
|
||||
{
|
||||
uint32_t block_num = block_header::num_from_id(block_id);
|
||||
block_id_type block_id_in_preferred_chain = _chain_db->get_block_id_for_num(block_num);
|
||||
return block_id == block_id_in_preferred_chain;
|
||||
}
|
||||
|
||||
/**
|
||||
* Assuming all data elements are ordered in some way, this method should
|
||||
* return up to limit ids that occur *after* the last ID in synopsis that
|
||||
|
|
@ -431,31 +445,42 @@ namespace detail {
|
|||
* in our blockchain after the last item returned in the result,
|
||||
* or 0 if the result contains the last item in the blockchain
|
||||
*/
|
||||
virtual std::vector<item_hash_t> get_item_ids(uint32_t item_type,
|
||||
const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit) override
|
||||
virtual std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit) override
|
||||
{ try {
|
||||
FC_ASSERT( item_type == graphene::net::block_message_type );
|
||||
vector<block_id_type> result;
|
||||
vector<block_id_type> result;
|
||||
remaining_item_count = 0;
|
||||
if( _chain_db->head_block_num() == 0 )
|
||||
return result;
|
||||
|
||||
result.reserve(limit);
|
||||
block_id_type last_known_block_id;
|
||||
auto itr = blockchain_synopsis.rbegin();
|
||||
while( itr != blockchain_synopsis.rend() )
|
||||
{
|
||||
if( _chain_db->is_known_block(*itr) || *itr == block_id_type() )
|
||||
{
|
||||
last_known_block_id = *itr;
|
||||
break;
|
||||
}
|
||||
++itr;
|
||||
}
|
||||
|
||||
for( auto num = block_header::num_from_id(last_known_block_id);
|
||||
if (blockchain_synopsis.empty() ||
|
||||
(blockchain_synopsis.size() == 1 && blockchain_synopsis[0] == block_id_type()))
|
||||
{
|
||||
// peer has sent us an empty synopsis meaning they have no blocks.
|
||||
// A bug in old versions would cause them to send a synopsis containing block 000000000
|
||||
// when they had an empty blockchain, so pretend they sent the right thing here.
|
||||
|
||||
// do nothing, leave last_known_block_id set to zero
|
||||
}
|
||||
else
|
||||
{
|
||||
bool found_a_block_in_synopsis = false;
|
||||
for (const item_hash_t& block_id_in_synopsis : boost::adaptors::reverse(blockchain_synopsis))
|
||||
if (block_id_in_synopsis == block_id_type() ||
|
||||
(_chain_db->is_known_block(block_id_in_synopsis) && is_included_block(block_id_in_synopsis)))
|
||||
{
|
||||
last_known_block_id = block_id_in_synopsis;
|
||||
found_a_block_in_synopsis = true;
|
||||
break;
|
||||
}
|
||||
if (!found_a_block_in_synopsis)
|
||||
FC_THROW_EXCEPTION(graphene::net::peer_is_on_an_unreachable_fork, "Unable to provide a list of blocks starting at any of the blocks in peer's synopsis");
|
||||
}
|
||||
for( uint32_t num = block_header::num_from_id(last_known_block_id);
|
||||
num <= _chain_db->head_block_num() && result.size() < limit;
|
||||
++num )
|
||||
if( num > 0 )
|
||||
|
|
@ -492,38 +517,180 @@ namespace detail {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a synopsis of the blockchain used for syncing.
|
||||
* This consists of a list of selected item hashes from our current preferred
|
||||
* blockchain, exponentially falling off into the past. Horrible explanation.
|
||||
* Returns a synopsis of the blockchain used for syncing. This consists of a list of
|
||||
* block hashes at intervals exponentially increasing towards the genesis block.
|
||||
* When syncing to a peer, the peer uses this data to determine if we're on the same
|
||||
* fork as they are, and if not, what blocks they need to send us to get us on their
|
||||
* fork.
|
||||
*
|
||||
* If the blockchain is empty, it will return the empty list.
|
||||
* If the blockchain has one block, it will return a list containing just that block.
|
||||
* If it contains more than one block:
|
||||
* the first element in the list will be the hash of the genesis block
|
||||
* the second element will be the hash of an item at the half way point in the blockchain
|
||||
* the third will be ~3/4 of the way through the block chain
|
||||
* the fourth will be at ~7/8...
|
||||
* &c.
|
||||
* the last item in the list will be the hash of the most recent block on our preferred chain
|
||||
* In the over-simplified case, this is a straighforward synopsis of our current
|
||||
* preferred blockchain; when we first connect up to a peer, this is what we will be sending.
|
||||
* It looks like this:
|
||||
* If the blockchain is empty, it will return the empty list.
|
||||
* If the blockchain has one block, it will return a list containing just that block.
|
||||
* If it contains more than one block:
|
||||
* the first element in the list will be the hash of the highest numbered block that
|
||||
* we cannot undo
|
||||
* the second element will be the hash of an item at the half way point in the undoable
|
||||
* segment of the blockchain
|
||||
* the third will be ~3/4 of the way through the undoable segment of the block chain
|
||||
* the fourth will be at ~7/8...
|
||||
* &c.
|
||||
* the last item in the list will be the hash of the most recent block on our preferred chain
|
||||
* so if the blockchain had 26 blocks labeled a - z, the synopsis would be:
|
||||
* a n u x z
|
||||
* the idea being that by sending a small (<30) number of block ids, we can summarize a huge
|
||||
* blockchain. The block ids are more dense near the end of the chain where because we are
|
||||
* more likely to be almost in sync when we first connect, and forks are likely to be short.
|
||||
* If the peer we're syncing with in our example is on a fork that started at block 'v',
|
||||
* then they will reply to our synopsis with a list of all blocks starting from block 'u',
|
||||
* the last block they know that we had in common.
|
||||
*
|
||||
* In the real code, there are several complications.
|
||||
*
|
||||
* First, as an optimization, we don't usually send a synopsis of the entire blockchain, we
|
||||
* send a synopsis of only the segment of the blockchain that we have undo data for. If their
|
||||
* fork doesn't build off of something in our undo history, we would be unable to switch, so there's
|
||||
* no reason to fetch the blocks.
|
||||
*
|
||||
* Second, when a peer replies to our initial synopsis and gives us a list of the blocks they think
|
||||
* we are missing, they only send a chunk of a few thousand blocks at once. After we get those
|
||||
* block ids, we need to request more blocks by sending another synopsis (we can't just say "send me
|
||||
* the next 2000 ids" because they may have switched forks themselves and they don't track what
|
||||
* they've sent us). For faster performance, we want to get a fairly long list of block ids first,
|
||||
* then start downloading the blocks.
|
||||
* The peer doesn't handle these follow-up block id requests any different from the initial request;
|
||||
* it treats the synopsis we send as our blockchain and bases its response entirely off that. So to
|
||||
* get the response we want (the next chunk of block ids following the last one they sent us, or,
|
||||
* failing that, the shortest fork off of the last list of block ids they sent), we need to construct
|
||||
* a synopsis as if our blockchain was made up of:
|
||||
* 1. the blocks in our block chain up to the fork point (if there is a fork) or the head block (if no fork)
|
||||
* 2. the blocks we've already pushed from their fork (if there's a fork)
|
||||
* 3. the block ids they've previously sent us
|
||||
* Segment 3 is handled in the p2p code, it just tells us the number of blocks it has (in
|
||||
* number_of_blocks_after_reference_point) so we can leave space in the synopsis for them.
|
||||
* We're responsible for constructing the synopsis of Segments 1 and 2 from our active blockchain and
|
||||
* fork database. The reference_point parameter is the last block from that peer that has been
|
||||
* successfully pushed to the blockchain, so that tells us whether the peer is on a fork or on
|
||||
* the main chain.
|
||||
*/
|
||||
virtual std::vector<item_hash_t> get_blockchain_synopsis(uint32_t item_type,
|
||||
const graphene::net::item_hash_t& reference_point,
|
||||
virtual std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
|
||||
uint32_t number_of_blocks_after_reference_point) override
|
||||
{ try {
|
||||
std::vector<item_hash_t> result;
|
||||
result.reserve(30);
|
||||
uint32_t head_block_num = _chain_db->head_block_num();
|
||||
result.push_back(_chain_db->head_block_id());
|
||||
uint32_t current = 1;
|
||||
while( current < head_block_num )
|
||||
{
|
||||
result.push_back(_chain_db->get_block_id_for_num(head_block_num - current));
|
||||
current = current*2;
|
||||
}
|
||||
std::reverse( result.begin(), result.end() );
|
||||
//idump((reference_point)(number_of_blocks_after_reference_point)(result));
|
||||
return result;
|
||||
} FC_CAPTURE_AND_RETHROW( (reference_point)(number_of_blocks_after_reference_point) ) }
|
||||
std::vector<item_hash_t> synopsis;
|
||||
synopsis.reserve(30);
|
||||
uint32_t high_block_num;
|
||||
uint32_t non_fork_high_block_num;
|
||||
uint32_t low_block_num = _chain_db->last_non_undoable_block_num();
|
||||
std::vector<block_id_type> fork_history;
|
||||
|
||||
if (reference_point != item_hash_t())
|
||||
{
|
||||
// the node is asking for a summary of the block chain up to a specified
|
||||
// block, which may or may not be on a fork
|
||||
// for now, assume it's not on a fork
|
||||
if (is_included_block(reference_point))
|
||||
{
|
||||
// reference_point is a block we know about and is on the main chain
|
||||
uint32_t reference_point_block_num = block_header::num_from_id(reference_point);
|
||||
assert(reference_point_block_num > 0);
|
||||
high_block_num = reference_point_block_num;
|
||||
non_fork_high_block_num = high_block_num;
|
||||
|
||||
if (reference_point_block_num < low_block_num)
|
||||
{
|
||||
// we're on the same fork (at least as far as reference_point) but we've passed
|
||||
// reference point and could no longer undo that far if we diverged after that
|
||||
// block. This should probably only happen due to a race condition where
|
||||
// the network thread calls this function, and then immediately pushes a bunch of blocks,
|
||||
// then the main thread finally processes this function.
|
||||
// with the current framework, there's not much we can do to tell the network
|
||||
// thread what our current head block is, so we'll just pretend that
|
||||
// our head is actually the reference point.
|
||||
// this *may* enable us to fetch blocks that we're unable to push, but that should
|
||||
// be a rare case (and correctly handled)
|
||||
low_block_num = reference_point_block_num;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// block is a block we know about, but it is on a fork
|
||||
try
|
||||
{
|
||||
std::vector<block_id_type> fork_history = _chain_db->get_block_ids_on_fork(reference_point);
|
||||
// returns a vector where the first element is the common ancestor with the preferred chain,
|
||||
// and the last element is the reference point you passed in
|
||||
assert(fork_history.size() >= 2);
|
||||
assert(fork_history.back() == reference_point);
|
||||
block_id_type last_non_fork_block = fork_history.front();
|
||||
fork_history.erase(fork_history.begin()); // remove the common ancestor
|
||||
|
||||
if (last_non_fork_block == block_id_type()) // if the fork goes all the way back to genesis (does graphene's fork db allow this?)
|
||||
non_fork_high_block_num = 0;
|
||||
else
|
||||
non_fork_high_block_num = block_header::num_from_id(last_non_fork_block);
|
||||
|
||||
high_block_num = non_fork_high_block_num + fork_history.size();
|
||||
assert(high_block_num == block_header::num_from_id(fork_history.back()));
|
||||
}
|
||||
catch (const fc::exception& e)
|
||||
{
|
||||
// unable to get fork history for some reason. maybe not linked?
|
||||
// we can't return a synopsis of its chain
|
||||
elog("Unable to construct a blockchain synopsis for reference hash ${hash}: ${exception}", ("hash", reference_point)("exception", e));
|
||||
throw;
|
||||
}
|
||||
if (non_fork_high_block_num < low_block_num)
|
||||
{
|
||||
wlog("Unable to generate a usable synopsis because the peer we're generating it for forked too long ago "
|
||||
"(our chains diverge after block #${non_fork_high_block_num} but only undoable to block #${low_block_num})",
|
||||
("low_block_num", low_block_num)
|
||||
("non_fork_high_block_num", non_fork_high_block_num));
|
||||
FC_THROW_EXCEPTION(graphene::net::block_older_than_undo_history, "Peer is are on a fork I'm unable to switch to");
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// no reference point specified, summarize the whole block chain
|
||||
high_block_num = _chain_db->head_block_num();
|
||||
non_fork_high_block_num = high_block_num;
|
||||
if (high_block_num == 0)
|
||||
return synopsis; // we have no blocks
|
||||
}
|
||||
|
||||
// at this point:
|
||||
// low_block_num is the block before the first block we can undo,
|
||||
// non_fork_high_block_num is the block before the fork (if the peer is on a fork, or otherwise it is the same as high_block_num)
|
||||
// high_block_num is the block number of the reference block, or the end of the chain if no reference provided
|
||||
|
||||
// true_high_block_num is the ending block number after the network code appends any item ids it
|
||||
// knows about that we don't
|
||||
uint32_t true_high_block_num = high_block_num + number_of_blocks_after_reference_point;
|
||||
do
|
||||
{
|
||||
// for each block in the synopsis, figure out where to pull the block id from.
|
||||
// if it's <= non_fork_high_block_num, we grab it from the main blockchain;
|
||||
// if it's not, we pull it from the fork history
|
||||
if (low_block_num <= non_fork_high_block_num)
|
||||
synopsis.push_back(_chain_db->get_block_id_for_num(low_block_num));
|
||||
else
|
||||
{
|
||||
// for debugging
|
||||
int index = low_block_num - non_fork_high_block_num - 1;
|
||||
if (index < 0 || index > fork_history.size())
|
||||
{
|
||||
int i = 0;
|
||||
}
|
||||
synopsis.push_back(fork_history[low_block_num - non_fork_high_block_num - 1]);
|
||||
}
|
||||
low_block_num += (true_high_block_num - low_block_num + 2) / 2;
|
||||
}
|
||||
while (low_block_num <= high_block_num);
|
||||
|
||||
idump((synopsis));
|
||||
return synopsis;
|
||||
} FC_CAPTURE_AND_RETHROW() }
|
||||
|
||||
/**
|
||||
* Call this after the call to handle_message succeeds.
|
||||
|
|
|
|||
|
|
@ -76,6 +76,16 @@ const signed_transaction& database::get_recent_transaction(const transaction_id_
|
|||
return itr->trx;
|
||||
}
|
||||
|
||||
std::vector<block_id_type> database::get_block_ids_on_fork(block_id_type head_of_fork) const
|
||||
{
|
||||
pair<fork_database::branch_type, fork_database::branch_type> branches = _fork_db.fetch_branch_from(head_block_id(), head_of_fork);
|
||||
assert(branches.first.back()->id == branches.second.back()->id);
|
||||
std::vector<block_id_type> result;
|
||||
for (const item_ptr& fork_block : branches.second)
|
||||
result.emplace_back(fork_block->id);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Push block "may fail" in which case every partial change is unwound. After
|
||||
* push block is successful the block is appended to the chain database on disk.
|
||||
|
|
|
|||
|
|
@ -84,4 +84,10 @@ node_property_object& database::node_properties()
|
|||
return _node_property_object;
|
||||
}
|
||||
|
||||
uint32_t database::last_non_undoable_block_num() const
|
||||
{
|
||||
return head_block_num() - _undo_db.size();
|
||||
}
|
||||
|
||||
|
||||
} }
|
||||
|
|
|
|||
|
|
@ -112,6 +112,7 @@ namespace graphene { namespace chain {
|
|||
optional<signed_block> fetch_block_by_id( const block_id_type& id )const;
|
||||
optional<signed_block> fetch_block_by_number( uint32_t num )const;
|
||||
const signed_transaction& get_recent_transaction( const transaction_id_type& trx_id )const;
|
||||
std::vector<block_id_type> get_block_ids_on_fork(block_id_type head_of_fork) const;
|
||||
|
||||
/**
|
||||
* Calculate the percent of block production slots that were missed in the
|
||||
|
|
@ -245,6 +246,8 @@ namespace graphene { namespace chain {
|
|||
|
||||
node_property_object& node_properties();
|
||||
|
||||
|
||||
uint32_t last_non_undoable_block_num() const;
|
||||
//////////////////// db_init.cpp ////////////////////
|
||||
|
||||
void initialize_evaluators();
|
||||
|
|
|
|||
|
|
@ -26,5 +26,6 @@ namespace graphene { namespace net {
|
|||
FC_DECLARE_DERIVED_EXCEPTION( insufficient_relay_fee, graphene::net::net_exception, 90002, "insufficient relay fee" );
|
||||
FC_DECLARE_DERIVED_EXCEPTION( already_connected_to_requested_peer, graphene::net::net_exception, 90003, "already connected to requested peer" );
|
||||
FC_DECLARE_DERIVED_EXCEPTION( block_older_than_undo_history, graphene::net::net_exception, 90004, "block is older than our undo history allows us to process" );
|
||||
FC_DECLARE_DERIVED_EXCEPTION( peer_is_on_an_unreachable_fork, graphene::net::net_exception, 90005, "peer is on another fork" );
|
||||
|
||||
} }
|
||||
|
|
|
|||
|
|
@ -99,10 +99,9 @@ namespace graphene { namespace net {
|
|||
* in our blockchain after the last item returned in the result,
|
||||
* or 0 if the result contains the last item in the blockchain
|
||||
*/
|
||||
virtual std::vector<item_hash_t> get_item_ids(uint32_t item_type,
|
||||
const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit = 2000) = 0;
|
||||
virtual std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit = 2000) = 0;
|
||||
|
||||
/**
|
||||
* Given the hash of the requested data, fetch the body.
|
||||
|
|
@ -119,14 +118,17 @@ namespace graphene { namespace net {
|
|||
* If the blockchain is empty, it will return the empty list.
|
||||
* If the blockchain has one block, it will return a list containing just that block.
|
||||
* If it contains more than one block:
|
||||
* the first element in the list will be the hash of the genesis block
|
||||
* the second element will be the hash of an item at the half way point in the blockchain
|
||||
* the third will be ~3/4 of the way through the block chain
|
||||
* the first element in the list will be the hash of the highest numbered block that
|
||||
* we cannot undo
|
||||
* the second element will be the hash of an item at the half way point in the undoable
|
||||
* segment of the blockchain
|
||||
* the third will be ~3/4 of the way through the undoable segment of the block chain
|
||||
* the fourth will be at ~7/8...
|
||||
* &c.
|
||||
* the last item in the list will be the hash of the most recent block on our preferred chain
|
||||
*/
|
||||
virtual std::vector<item_hash_t> get_blockchain_synopsis(uint32_t item_type, const graphene::net::item_hash_t& reference_point = graphene::net::item_hash_t(), uint32_t number_of_blocks_after_reference_point = 0) = 0;
|
||||
virtual std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
|
||||
uint32_t number_of_blocks_after_reference_point) = 0;
|
||||
|
||||
/**
|
||||
* Call this after the call to handle_message succeeds.
|
||||
|
|
|
|||
|
|
@ -219,7 +219,7 @@ namespace graphene { namespace net
|
|||
uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
|
||||
bool peer_needs_sync_items_from_us;
|
||||
bool we_need_sync_items_from_peer;
|
||||
fc::optional<boost::tuple<item_id, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
|
||||
fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
|
||||
item_to_time_map_type sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
|
||||
uint32_t last_block_number_delegate_has_seen; /// the number of the last block this peer has told us about that the delegate knows (ids_of_items_to_get[0] should be the id of block [this value + 1])
|
||||
item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@
|
|||
#include <boost/multi_index/hashed_index.hpp>
|
||||
#include <boost/logic/tribool.hpp>
|
||||
#include <boost/range/algorithm_ext/push_back.hpp>
|
||||
#include <boost/range/algorithm/find.hpp>
|
||||
#include <boost/range/numeric.hpp>
|
||||
|
||||
#include <boost/accumulators/accumulators.hpp>
|
||||
|
|
@ -279,7 +280,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
(handle_message) \
|
||||
(handle_block) \
|
||||
(handle_transaction) \
|
||||
(get_item_ids) \
|
||||
(get_block_ids) \
|
||||
(get_item) \
|
||||
(get_chain_id) \
|
||||
(get_blockchain_synopsis) \
|
||||
|
|
@ -375,15 +376,13 @@ namespace graphene { namespace net { namespace detail {
|
|||
void handle_message( const message& ) override;
|
||||
bool handle_block( const graphene::net::block_message& block_message, bool sync_mode, std::vector<fc::uint160_t>& contained_transaction_message_ids ) override;
|
||||
void handle_transaction( const graphene::net::trx_message& transaction_message ) override;
|
||||
std::vector<item_hash_t> get_item_ids(uint32_t item_type,
|
||||
const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit = 2000) override;
|
||||
std::vector<item_hash_t> get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit = 2000) override;
|
||||
message get_item( const item_id& id ) override;
|
||||
chain_id_type get_chain_id() const override;
|
||||
std::vector<item_hash_t> get_blockchain_synopsis(uint32_t item_type,
|
||||
const graphene::net::item_hash_t& reference_point = graphene::net::item_hash_t(),
|
||||
uint32_t number_of_blocks_after_reference_point = 0) override;
|
||||
std::vector<item_hash_t> get_blockchain_synopsis(const item_hash_t& reference_point,
|
||||
uint32_t number_of_blocks_after_reference_point) override;
|
||||
void sync_status( uint32_t item_type, uint32_t item_count ) override;
|
||||
void connection_count_changed( uint32_t c ) override;
|
||||
uint32_t get_block_number(const item_hash_t& block_id) override;
|
||||
|
|
@ -1322,9 +1321,9 @@ namespace graphene { namespace net { namespace detail {
|
|||
active_peer->item_ids_requested_from_peer &&
|
||||
active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold)
|
||||
{
|
||||
wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${id}",
|
||||
wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}",
|
||||
("peer", active_peer->get_remote_endpoint())
|
||||
("id", active_peer->item_ids_requested_from_peer->get<0>().item_hash));
|
||||
("synopsis", active_peer->item_ids_requested_from_peer->get<0>()));
|
||||
disconnect_due_to_request_timeout = true;
|
||||
}
|
||||
if (!disconnect_due_to_request_timeout)
|
||||
|
|
@ -2145,20 +2144,38 @@ namespace graphene { namespace net { namespace detail {
|
|||
const fetch_blockchain_item_ids_message& fetch_blockchain_item_ids_message_received)
|
||||
{
|
||||
VERIFY_CORRECT_THREAD();
|
||||
item_id peers_last_item_seen;
|
||||
if( !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() )
|
||||
peers_last_item_seen = item_id( fetch_blockchain_item_ids_message_received.item_type,
|
||||
fetch_blockchain_item_ids_message_received.blockchain_synopsis.back() );
|
||||
dlog( "sync: received a request for item ids after ${last_item_seen} from peer ${peer_endpoint} (full request: ${synopsis})",
|
||||
( "last_item_seen", peers_last_item_seen )
|
||||
( "peer_endpoint", originating_peer->get_remote_endpoint() )
|
||||
( "synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis ) );
|
||||
item_id peers_last_item_seen = item_id(fetch_blockchain_item_ids_message_received.item_type, item_hash_t());
|
||||
if (fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty())
|
||||
{
|
||||
dlog("sync: received a request for item ids starting at the beginning of the chain from peer ${peer_endpoint} (full request: ${synopsis})",
|
||||
("peer_endpoint", originating_peer->get_remote_endpoint())
|
||||
("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis));
|
||||
}
|
||||
else
|
||||
{
|
||||
item_hash_t peers_last_item_hash_seen = fetch_blockchain_item_ids_message_received.blockchain_synopsis.back();
|
||||
dlog("sync: received a request for item ids after ${last_item_seen} from peer ${peer_endpoint} (full request: ${synopsis})",
|
||||
("last_item_seen", peers_last_item_hash_seen)
|
||||
("peer_endpoint", originating_peer->get_remote_endpoint())
|
||||
("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis));
|
||||
peers_last_item_seen.item_hash = peers_last_item_hash_seen;
|
||||
}
|
||||
|
||||
blockchain_item_ids_inventory_message reply_message;
|
||||
reply_message.item_hashes_available = _delegate->get_item_ids( fetch_blockchain_item_ids_message_received.item_type,
|
||||
fetch_blockchain_item_ids_message_received.blockchain_synopsis,
|
||||
reply_message.total_remaining_item_count );
|
||||
reply_message.item_type = fetch_blockchain_item_ids_message_received.item_type;
|
||||
reply_message.total_remaining_item_count = 0;
|
||||
try
|
||||
{
|
||||
reply_message.item_hashes_available = _delegate->get_block_ids(fetch_blockchain_item_ids_message_received.blockchain_synopsis,
|
||||
reply_message.total_remaining_item_count);
|
||||
}
|
||||
catch (const peer_is_on_an_unreachable_fork&)
|
||||
{
|
||||
dlog("Peer is on a fork and there's no set of blocks we can provide to switch them to our fork");
|
||||
// we reply with an empty list as if we had an empty blockchain;
|
||||
// we don't want to disconnect because they may be able to provide
|
||||
// us with blocks on their chain
|
||||
}
|
||||
|
||||
bool disconnect_from_inhibited_peer = false;
|
||||
// if our client doesn't have any items after the item the peer requested, it will send back
|
||||
|
|
@ -2170,8 +2187,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
reply_message.item_hashes_available.size() == 1 &&
|
||||
std::find(fetch_blockchain_item_ids_message_received.blockchain_synopsis.begin(),
|
||||
fetch_blockchain_item_ids_message_received.blockchain_synopsis.end(),
|
||||
reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end()
|
||||
)
|
||||
reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end() )
|
||||
{
|
||||
/* the last item in the peer's list matches the last item in our list */
|
||||
originating_peer->peer_needs_sync_items_from_us = false;
|
||||
|
|
@ -2197,7 +2213,6 @@ namespace graphene { namespace net { namespace detail {
|
|||
}
|
||||
else
|
||||
{
|
||||
//dlog( "sync: peer is out of sync, sending peer ${count} items ids: ${item_ids}", ("count", reply_message.item_hashes_available.size() )("item_ids", reply_message.item_hashes_available ) );
|
||||
dlog("sync: peer is out of sync, sending peer ${count} items ids: first: ${first_item_id}, last: ${last_item_id}",
|
||||
("count", reply_message.item_hashes_available.size())
|
||||
("first_item_id", reply_message.item_hashes_available.front())
|
||||
|
|
@ -2270,24 +2285,35 @@ namespace graphene { namespace net { namespace detail {
|
|||
// This is pretty expensive, we should find a better way to do this
|
||||
std::unique_ptr<std::vector<item_hash_t> > original_ids_of_items_to_get(new std::vector<item_hash_t>(peer->ids_of_items_to_get.begin(), peer->ids_of_items_to_get.end()));
|
||||
|
||||
std::vector<item_hash_t> synopsis = _delegate->get_blockchain_synopsis(_sync_item_type, reference_point, number_of_blocks_after_reference_point);
|
||||
assert(reference_point == item_hash_t() || !synopsis.empty());
|
||||
std::vector<item_hash_t> synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
|
||||
|
||||
// if we passed in a reference point, we believe it is one the client has already accepted and should
|
||||
// be able to generate a synopsis based on it
|
||||
if( reference_point != item_hash_t() && synopsis.empty() )
|
||||
synopsis = _delegate->get_blockchain_synopsis( _sync_item_type, reference_point, number_of_blocks_after_reference_point );
|
||||
#if 0
|
||||
// just for debugging, enable this and set a breakpoint to step through
|
||||
if (synopsis.empty())
|
||||
synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
|
||||
|
||||
// TODO: it's possible that the returned synopsis is empty if the blockchain is empty (that's fine)
|
||||
// or if the reference point is now past our undo history (that's not).
|
||||
// in the second case, we should mark this peer as one we're unable to sync with and
|
||||
// disconnect them.
|
||||
if (reference_point != item_hash_t() && synopsis.empty())
|
||||
FC_THROW_EXCEPTION(block_older_than_undo_history, "You are on a fork I'm unable to switch to");
|
||||
#endif
|
||||
|
||||
if( number_of_blocks_after_reference_point )
|
||||
{
|
||||
// then the synopsis is incomplete, add the missing elements from ids_of_items_to_get
|
||||
uint32_t true_high_block_num = reference_point_block_num + number_of_blocks_after_reference_point;
|
||||
uint32_t low_block_num = 1;
|
||||
|
||||
// in order to generate a seamless synopsis, we need to be using the same low_block_num as the
|
||||
// backend code; the first block in the synopsis will be the low block number it used
|
||||
uint32_t low_block_num = synopsis.empty() ? 1 : _delegate->get_block_number(synopsis.front());
|
||||
|
||||
do
|
||||
{
|
||||
if( low_block_num > reference_point_block_num )
|
||||
synopsis.push_back( (*original_ids_of_items_to_get)[low_block_num - reference_point_block_num - 1] );
|
||||
low_block_num += ( (true_high_block_num - low_block_num + 2 ) / 2 );
|
||||
synopsis.push_back((*original_ids_of_items_to_get)[low_block_num - reference_point_block_num - 1]);
|
||||
low_block_num += (true_high_block_num - low_block_num + 2 ) / 2;
|
||||
}
|
||||
while ( low_block_num <= true_high_block_num );
|
||||
assert(synopsis.back() == original_ids_of_items_to_get->back());
|
||||
|
|
@ -2305,14 +2331,25 @@ namespace graphene { namespace net { namespace detail {
|
|||
peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t());
|
||||
}
|
||||
|
||||
std::vector<item_hash_t> blockchain_synopsis = create_blockchain_synopsis_for_peer( peer );
|
||||
item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back();
|
||||
dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, (full request is ${blockchain_synopsis})",
|
||||
( "last_item_seen", last_item_seen )
|
||||
( "peer", peer->get_remote_endpoint() )
|
||||
( "blockchain_synopsis", blockchain_synopsis ) );
|
||||
peer->item_ids_requested_from_peer = boost::make_tuple( item_id(_sync_item_type, last_item_seen ), fc::time_point::now() );
|
||||
peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) );
|
||||
fc::oexception synopsis_exception;
|
||||
try
|
||||
{
|
||||
std::vector<item_hash_t> blockchain_synopsis = create_blockchain_synopsis_for_peer( peer );
|
||||
|
||||
item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back();
|
||||
dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, (full request is ${blockchain_synopsis})",
|
||||
( "last_item_seen", last_item_seen )
|
||||
( "peer", peer->get_remote_endpoint() )
|
||||
( "blockchain_synopsis", blockchain_synopsis ) );
|
||||
peer->item_ids_requested_from_peer = boost::make_tuple( blockchain_synopsis, fc::time_point::now() );
|
||||
peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) );
|
||||
}
|
||||
catch (const block_older_than_undo_history& e)
|
||||
{
|
||||
synopsis_exception = e;
|
||||
}
|
||||
if (synopsis_exception)
|
||||
disconnect_from_peer(peer, "You are on a fork I'm unable to switch to");
|
||||
}
|
||||
|
||||
void node_impl::on_blockchain_item_ids_inventory_message(peer_connection* originating_peer,
|
||||
|
|
@ -2322,6 +2359,58 @@ namespace graphene { namespace net { namespace detail {
|
|||
// ignore unless we asked for the data
|
||||
if( originating_peer->item_ids_requested_from_peer )
|
||||
{
|
||||
// verify that the peer's the block ids the peer sent is a valid response to our request;
|
||||
// It should either be an empty list of blocks, or a list of blocks that builds off of one of
|
||||
// the blocks in the synopsis we sent
|
||||
if (!blockchain_item_ids_inventory_message_received.item_hashes_available.empty())
|
||||
{
|
||||
const std::vector<item_hash_t>& synopsis_sent_in_request = originating_peer->item_ids_requested_from_peer->get<0>();
|
||||
const item_hash_t& first_item_hash = blockchain_item_ids_inventory_message_received.item_hashes_available.front();
|
||||
|
||||
if (synopsis_sent_in_request.empty())
|
||||
{
|
||||
// if we sent an empty synopsis, we were asking for all blocks, so the first block should be block 1
|
||||
if (_delegate->get_block_number(first_item_hash) != 1)
|
||||
{
|
||||
wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks starting from the beginning of the chain, "
|
||||
"but they provided a list of blocks starting with ${first_block}",
|
||||
("peer_endpoint", originating_peer->get_remote_endpoint())
|
||||
("first_block", first_item_hash));
|
||||
// TODO: enable these once committed
|
||||
//fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks starting from the beginning of the chain, "
|
||||
// "but you returned a list of blocks starting with ${first_block}",
|
||||
// ("first_block", first_item_hash)));
|
||||
//disconnect_from_peer(originating_peer,
|
||||
// "You gave an invalid response to my request for sync blocks",
|
||||
// true, error_for_peer);
|
||||
disconnect_from_peer(originating_peer,
|
||||
"You gave an invalid response to my request for sync blocks");
|
||||
return;
|
||||
}
|
||||
}
|
||||
else // synopsis was not empty, we expect a response building off one of the blocks we sent
|
||||
{
|
||||
if (boost::range::find(synopsis_sent_in_request, first_item_hash) == synopsis_sent_in_request.end())
|
||||
{
|
||||
wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks based on the synopsis ${synopsis}, but they "
|
||||
"provided a list of blocks starting with ${first_block}",
|
||||
("peer_endpoint", originating_peer->get_remote_endpoint())
|
||||
("synopsis", synopsis_sent_in_request)
|
||||
("first_block", first_item_hash));
|
||||
// TODO: enable these once committed
|
||||
//fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks following something in "
|
||||
// "${synopsis}, but you returned a list of blocks starting with ${first_block} which wasn't one of your choices",
|
||||
// ("synopsis", synopsis_sent_in_request)
|
||||
// ("first_block", first_item_hash)));
|
||||
//disconnect_from_peer(originating_peer,
|
||||
// "You gave an invalid response to my request for sync blocks",
|
||||
// true, error_for_peer);
|
||||
disconnect_from_peer(originating_peer,
|
||||
"You gave an invalid response to my request for sync blocks");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
originating_peer->item_ids_requested_from_peer.reset();
|
||||
|
||||
dlog( "sync: received a list of ${count} available items from ${peer_endpoint}",
|
||||
|
|
@ -5185,12 +5274,11 @@ namespace graphene { namespace net { namespace detail {
|
|||
INVOKE_AND_COLLECT_STATISTICS(handle_transaction, transaction_message);
|
||||
}
|
||||
|
||||
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_item_ids(uint32_t item_type,
|
||||
const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit /* = 2000 */)
|
||||
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_block_ids(const std::vector<item_hash_t>& blockchain_synopsis,
|
||||
uint32_t& remaining_item_count,
|
||||
uint32_t limit /* = 2000 */)
|
||||
{
|
||||
INVOKE_AND_COLLECT_STATISTICS(get_item_ids, item_type, blockchain_synopsis, remaining_item_count, limit);
|
||||
INVOKE_AND_COLLECT_STATISTICS(get_block_ids, blockchain_synopsis, remaining_item_count, limit);
|
||||
}
|
||||
|
||||
message statistics_gathering_node_delegate_wrapper::get_item( const item_id& id )
|
||||
|
|
@ -5203,11 +5291,9 @@ namespace graphene { namespace net { namespace detail {
|
|||
INVOKE_AND_COLLECT_STATISTICS(get_chain_id);
|
||||
}
|
||||
|
||||
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(uint32_t item_type,
|
||||
const graphene::net::item_hash_t& reference_point /* = graphene::net::item_hash_t() */,
|
||||
uint32_t number_of_blocks_after_reference_point /* = 0 */)
|
||||
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(const item_hash_t& reference_point, uint32_t number_of_blocks_after_reference_point)
|
||||
{
|
||||
INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, item_type, reference_point, number_of_blocks_after_reference_point);
|
||||
INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, reference_point, number_of_blocks_after_reference_point);
|
||||
}
|
||||
|
||||
void statistics_gathering_node_delegate_wrapper::sync_status( uint32_t item_type, uint32_t item_count )
|
||||
|
|
|
|||
|
|
@ -276,8 +276,8 @@ namespace graphene { namespace net
|
|||
#ifndef NDEBUG
|
||||
struct counter {
|
||||
unsigned& _send_message_queue_tasks_counter;
|
||||
counter(unsigned& var) : _send_message_queue_tasks_counter(var) { dlog("entering peer_connection::send_queued_messages_task()"); assert(_send_message_queue_tasks_counter == 0); ++_send_message_queue_tasks_counter; }
|
||||
~counter() { assert(_send_message_queue_tasks_counter == 1); --_send_message_queue_tasks_counter; dlog("leaving peer_connection::send_queued_messages_task()"); }
|
||||
counter(unsigned& var) : _send_message_queue_tasks_counter(var) { /* dlog("entering peer_connection::send_queued_messages_task()"); */ assert(_send_message_queue_tasks_counter == 0); ++_send_message_queue_tasks_counter; }
|
||||
~counter() { assert(_send_message_queue_tasks_counter == 1); --_send_message_queue_tasks_counter; /* dlog("leaving peer_connection::send_queued_messages_task()"); */ }
|
||||
} concurrent_invocation_counter(_send_message_queue_tasks_running);
|
||||
#endif
|
||||
while (!_queued_messages.empty())
|
||||
|
|
@ -286,12 +286,12 @@ namespace graphene { namespace net
|
|||
message message_to_send = _queued_messages.front()->get_message(_node);
|
||||
try
|
||||
{
|
||||
dlog("peer_connection::send_queued_messages_task() calling message_oriented_connection::send_message() "
|
||||
"to send message of type ${type} for peer ${endpoint}",
|
||||
("type", message_to_send.msg_type)("endpoint", get_remote_endpoint()));
|
||||
//dlog("peer_connection::send_queued_messages_task() calling message_oriented_connection::send_message() "
|
||||
// "to send message of type ${type} for peer ${endpoint}",
|
||||
// ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint()));
|
||||
_message_connection.send_message(message_to_send);
|
||||
dlog("peer_connection::send_queued_messages_task()'s call to message_oriented_connection::send_message() completed normally for peer ${endpoint}",
|
||||
("endpoint", get_remote_endpoint()));
|
||||
//dlog("peer_connection::send_queued_messages_task()'s call to message_oriented_connection::send_message() completed normally for peer ${endpoint}",
|
||||
// ("endpoint", get_remote_endpoint()));
|
||||
}
|
||||
catch (const fc::canceled_exception&)
|
||||
{
|
||||
|
|
@ -323,7 +323,7 @@ namespace graphene { namespace net
|
|||
_total_queued_messages_size -= _queued_messages.front()->get_size_in_queue();
|
||||
_queued_messages.pop();
|
||||
}
|
||||
dlog("leaving peer_connection::send_queued_messages_task() due to queue exhaustion");
|
||||
//dlog("leaving peer_connection::send_queued_messages_task() due to queue exhaustion");
|
||||
}
|
||||
|
||||
void peer_connection::send_queueable_message(std::unique_ptr<queued_message>&& message_to_send)
|
||||
|
|
@ -351,18 +351,18 @@ namespace graphene { namespace net
|
|||
|
||||
if (!_send_queued_messages_done.valid() || _send_queued_messages_done.ready())
|
||||
{
|
||||
dlog("peer_connection::send_message() is firing up send_queued_message_task");
|
||||
//dlog("peer_connection::send_message() is firing up send_queued_message_task");
|
||||
_send_queued_messages_done = fc::async([this](){ send_queued_messages_task(); }, "send_queued_messages_task");
|
||||
}
|
||||
else
|
||||
dlog("peer_connection::send_message() doesn't need to fire up send_queued_message_task, it's already running");
|
||||
//else
|
||||
// dlog("peer_connection::send_message() doesn't need to fire up send_queued_message_task, it's already running");
|
||||
}
|
||||
|
||||
void peer_connection::send_message(const message& message_to_send, size_t message_send_time_field_offset)
|
||||
{
|
||||
VERIFY_CORRECT_THREAD();
|
||||
dlog("peer_connection::send_message() enqueueing message of type ${type} for peer ${endpoint}",
|
||||
("type", message_to_send.msg_type)("endpoint", get_remote_endpoint()));
|
||||
//dlog("peer_connection::send_message() enqueueing message of type ${type} for peer ${endpoint}",
|
||||
// ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint()));
|
||||
std::unique_ptr<queued_message> message_to_enqueue(new real_queued_message(message_to_send, message_send_time_field_offset));
|
||||
send_queueable_message(std::move(message_to_enqueue));
|
||||
}
|
||||
|
|
@ -370,8 +370,8 @@ namespace graphene { namespace net
|
|||
void peer_connection::send_item(const item_id& item_to_send)
|
||||
{
|
||||
VERIFY_CORRECT_THREAD();
|
||||
dlog("peer_connection::send_item() enqueueing message of type ${type} for peer ${endpoint}",
|
||||
("type", item_to_send.item_type)("endpoint", get_remote_endpoint()));
|
||||
//dlog("peer_connection::send_item() enqueueing message of type ${type} for peer ${endpoint}",
|
||||
// ("type", item_to_send.item_type)("endpoint", get_remote_endpoint()));
|
||||
std::unique_ptr<queued_message> message_to_enqueue(new virtual_queued_message(item_to_send));
|
||||
send_queueable_message(std::move(message_to_enqueue));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1076,7 +1076,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture )
|
|||
}
|
||||
};
|
||||
|
||||
auto generate_xfer_tx = [&]( account_id_type from, account_id_type to, share_type amount, int blocks_to_expire=10 ) -> signed_transaction
|
||||
auto generate_xfer_tx = [&]( account_id_type from, account_id_type to, share_type amount, int blocks_to_expire ) -> signed_transaction
|
||||
{
|
||||
signed_transaction tx;
|
||||
transfer_operation xfer_op;
|
||||
|
|
@ -1128,8 +1128,8 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture )
|
|||
//
|
||||
|
||||
signed_transaction tx_a = generate_xfer_tx( bob_id, alice_id, 1000, 3 );
|
||||
signed_transaction tx_b = generate_xfer_tx( alice_id, bob_id, 2000 );
|
||||
signed_transaction tx_c = generate_xfer_tx( alice_id, bob_id, 500 );
|
||||
signed_transaction tx_b = generate_xfer_tx( alice_id, bob_id, 2000, 10 );
|
||||
signed_transaction tx_c = generate_xfer_tx( alice_id, bob_id, 500, 10 );
|
||||
|
||||
generate_block( db );
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue