When the p2p code processes a block that contains transactions

we haven't seen, avoid fetching those transactions separately
This commit is contained in:
Eric Frias 2015-07-16 15:30:05 -04:00
parent 984cf47841
commit 11a5d2b620
5 changed files with 111 additions and 49 deletions

View file

@ -312,11 +312,29 @@ namespace detail {
* *
* @throws exception if error validating the item, otherwise the item is safe to broadcast on. * @throws exception if error validating the item, otherwise the item is safe to broadcast on.
*/ */
virtual bool handle_block(const graphene::net::block_message& blk_msg, bool sync_mode) override virtual bool handle_block(const graphene::net::block_message& blk_msg, bool sync_mode,
std::vector<fc::uint160_t>& contained_transaction_message_ids) override
{ try { { try {
ilog("Got block #${n} from network", ("n", blk_msg.block.block_num())); ilog("Got block #${n} from network", ("n", blk_msg.block.block_num()));
try { try {
return _chain_db->push_block(blk_msg.block, _is_block_producer? database::skip_nothing : database::skip_transaction_signatures); bool result = _chain_db->push_block(blk_msg.block, _is_block_producer ? database::skip_nothing : database::skip_transaction_signatures);
// the block was accepted, so we now know all of the transactions contained in the block
if (!sync_mode)
{
// if we're not in sync mode, there's a chance we will be seeing some transactions
// included in blocks before we see the free-floating transaction itself. If that
// happens, there's no reason to fetch the transactions, so construct a list of the
// transaction message ids we no longer need.
// during sync, it is unlikely that we'll see any old
for (const processed_transaction& transaction : blk_msg.block.transactions)
{
graphene::net::trx_message transaction_message(transaction);
contained_transaction_message_ids.push_back(graphene::net::message(transaction_message).id());
}
}
return result;
} catch( const fc::exception& e ) { } catch( const fc::exception& e ) {
elog("Error when pushing block:\n${e}", ("e", e.to_detail_string())); elog("Error when pushing block:\n${e}", ("e", e.to_detail_string()));
throw; throw;
@ -329,12 +347,17 @@ namespace detail {
} }
} FC_CAPTURE_AND_RETHROW( (blk_msg)(sync_mode) ) } } FC_CAPTURE_AND_RETHROW( (blk_msg)(sync_mode) ) }
virtual bool handle_transaction(const graphene::net::trx_message& trx_msg, bool sync_mode) override virtual void handle_transaction(const graphene::net::trx_message& transaction_message) override
{ try { { try {
ilog("Got transaction from network"); ilog("Got transaction from network");
_chain_db->push_transaction( trx_msg.trx ); _chain_db->push_transaction( transaction_message.trx );
return false; } FC_CAPTURE_AND_RETHROW( (transaction_message) ) }
} FC_CAPTURE_AND_RETHROW( (trx_msg)(sync_mode) ) }
virtual void handle_message(const message& message_to_process) override
{
// not a transaction, not a block
FC_THROW( "Invalid Message Type" );
}
/** /**
* Assuming all data elements are ordered in some way, this method should * Assuming all data elements are ordered in some way, this method should

View file

@ -97,7 +97,7 @@ bool database::_push_block(const signed_block& new_block)
uint32_t skip = get_node_properties().skip_flags; uint32_t skip = get_node_properties().skip_flags;
if( !(skip&skip_fork_db) ) if( !(skip&skip_fork_db) )
{ {
auto new_head = _fork_db.push_block(new_block); shared_ptr<fork_item> new_head = _fork_db.push_block(new_block);
//If the head block from the longest chain does not build off of the current head, we need to switch forks. //If the head block from the longest chain does not build off of the current head, we need to switch forks.
if( new_head->data.previous != head_block_id() ) if( new_head->data.previous != head_block_id() )
{ {
@ -116,7 +116,7 @@ bool database::_push_block(const signed_block& new_block)
{ {
optional<fc::exception> except; optional<fc::exception> except;
try { try {
auto session = _undo_db.start_undo_session(); undo_database::session session = _undo_db.start_undo_session();
apply_block( (*ritr)->data, skip ); apply_block( (*ritr)->data, skip );
_block_id_to_block.store( (*ritr)->id, (*ritr)->data ); _block_id_to_block.store( (*ritr)->id, (*ritr)->data );
session.commit(); session.commit();

View file

@ -62,8 +62,7 @@ namespace graphene { namespace net {
virtual bool has_item( const net::item_id& id ) = 0; virtual bool has_item( const net::item_id& id ) = 0;
/** /**
* @brief allows the application to validate an item prior to * @brief Called when a new block comes in from the network
* broadcasting to peers.
* *
* @param sync_mode true if the message was fetched through the sync process, false during normal operation * @param sync_mode true if the message was fetched through the sync process, false during normal operation
* @returns true if this message caused the blockchain to switch forks, false if it did not * @returns true if this message caused the blockchain to switch forks, false if it did not
@ -71,21 +70,26 @@ namespace graphene { namespace net {
* @throws exception if error validating the item, otherwise the item is * @throws exception if error validating the item, otherwise the item is
* safe to broadcast on. * safe to broadcast on.
*/ */
virtual bool handle_block( const graphene::net::block_message& blk_msg, bool syncmode ) = 0; virtual bool handle_block( const graphene::net::block_message& blk_msg, bool sync_mode,
virtual bool handle_transaction( const graphene::net::trx_message& trx_msg, bool syncmode ) = 0; std::vector<fc::uint160_t>& contained_transaction_message_ids ) = 0;
/**
* @brief Called when a new transaction comes in from the network
*
* @throws exception if error validating the item, otherwise the item is
* safe to broadcast on.
*/
virtual void handle_transaction( const graphene::net::trx_message& trx_msg ) = 0;
virtual bool handle_message( const message& message_to_process, bool sync_mode ) /**
{ * @brief Called when a new message comes in from the network other than a
switch( message_to_process.msg_type ) * block or a transaction. Currently there are no other possible
{ * messages, so this should never be called.
case block_message_type: *
return handle_block(message_to_process.as<block_message>(), sync_mode); * @throws exception if error validating the item, otherwise the item is
case trx_message_type: * safe to broadcast on.
return handle_transaction(message_to_process.as<trx_message>(), sync_mode); */
default: virtual void handle_message( const message& message_to_process ) = 0;
FC_THROW( "Invalid Message Type" );
};
}
/** /**
* Assuming all data elements are ordered in some way, this method should * Assuming all data elements are ordered in some way, this method should

View file

@ -276,6 +276,8 @@ namespace graphene { namespace net { namespace detail {
boost::accumulators::tag::count> > call_stats_accumulator; boost::accumulators::tag::count> > call_stats_accumulator;
#define NODE_DELEGATE_METHOD_NAMES (has_item) \ #define NODE_DELEGATE_METHOD_NAMES (has_item) \
(handle_message) \ (handle_message) \
(handle_block) \
(handle_transaction) \
(get_item_ids) \ (get_item_ids) \
(get_item) \ (get_item) \
(get_chain_id) \ (get_chain_id) \
@ -367,9 +369,9 @@ namespace graphene { namespace net { namespace detail {
fc::variant_object get_call_statistics(); fc::variant_object get_call_statistics();
bool has_item( const net::item_id& id ) override; bool has_item( const net::item_id& id ) override;
bool handle_message( const message&, bool sync_mode ) override; void handle_message( const message& ) override;
bool handle_block( const graphene::net::block_message& blk_msg, bool syncmode ) override; bool handle_block( const graphene::net::block_message& block_message, bool sync_mode, std::vector<fc::uint160_t>& contained_transaction_message_ids ) override;
bool handle_transaction( const graphene::net::trx_message& trx_msg, bool syncmode ) override; void handle_transaction( const graphene::net::trx_message& transaction_message ) override;
std::vector<item_hash_t> get_item_ids(uint32_t item_type, std::vector<item_hash_t> get_item_ids(uint32_t item_type,
const std::vector<item_hash_t>& blockchain_synopsis, const std::vector<item_hash_t>& blockchain_synopsis,
uint32_t& remaining_item_count, uint32_t& remaining_item_count,
@ -448,9 +450,11 @@ namespace graphene { namespace net { namespace detail {
bool _items_to_fetch_updated; bool _items_to_fetch_updated;
fc::future<void> _fetch_item_loop_done; fc::future<void> _fetch_item_loop_done;
struct item_id_index{};
typedef boost::multi_index_container<prioritized_item_id, typedef boost::multi_index_container<prioritized_item_id,
boost::multi_index::indexed_by<boost::multi_index::ordered_unique<boost::multi_index::identity<prioritized_item_id> >, boost::multi_index::indexed_by<boost::multi_index::ordered_unique<boost::multi_index::identity<prioritized_item_id> >,
boost::multi_index::hashed_unique<boost::multi_index::member<prioritized_item_id, item_id, &prioritized_item_id::item>, boost::multi_index::hashed_unique<boost::multi_index::tag<item_id_index>,
boost::multi_index::member<prioritized_item_id, item_id, &prioritized_item_id::item>,
std::hash<item_id> > > std::hash<item_id> > >
> items_to_fetch_set_type; > items_to_fetch_set_type;
unsigned _items_to_fetch_sequence_counter; unsigned _items_to_fetch_sequence_counter;
@ -2777,7 +2781,8 @@ namespace graphene { namespace net { namespace detail {
try try
{ {
_delegate->handle_block(block_message_to_send, true); std::vector<fc::uint160_t> contained_transaction_message_ids;
_delegate->handle_block(block_message_to_send, true, contained_transaction_message_ids);
ilog("Successfully pushed sync block ${num} (id:${id})", ilog("Successfully pushed sync block ${num} (id:${id})",
("num", block_message_to_send.block.block_num()) ("num", block_message_to_send.block.block_num())
("id", block_message_to_send.block_id)); ("id", block_message_to_send.block_id));
@ -3095,12 +3100,31 @@ namespace graphene { namespace net { namespace detail {
if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(), if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(),
block_message_to_process.block_id) == _most_recent_blocks_accepted.end()) block_message_to_process.block_id) == _most_recent_blocks_accepted.end())
{ {
_delegate->handle_block(block_message_to_process, false); std::vector<fc::uint160_t> contained_transaction_message_ids;
_delegate->handle_block(block_message_to_process, false, contained_transaction_message_ids);
message_validated_time = fc::time_point::now(); message_validated_time = fc::time_point::now();
ilog("Successfully pushed block ${num} (id:${id})", ilog("Successfully pushed block ${num} (id:${id})",
("num", block_message_to_process.block.block_num()) ("num", block_message_to_process.block.block_num())
("id", block_message_to_process.block_id)); ("id", block_message_to_process.block_id));
_most_recent_blocks_accepted.push_back(block_message_to_process.block_id); _most_recent_blocks_accepted.push_back(block_message_to_process.block_id);
bool new_transaction_discovered = false;
for (const item_hash_t& transaction_message_hash : contained_transaction_message_ids)
{
size_t items_erased = _items_to_fetch.get<item_id_index>().erase(item_id(trx_message_type, transaction_message_hash));
// there are two ways we could behave here: we could either act as if we received
// the transaction outside the block and offer it to our peers, or we could just
// forget about it (we would still advertise this block to our peers so they should
// get the transaction through that mechanism).
// We take the second approach, bring in the next if block to try the first approach
//if (items_erased)
//{
// new_transaction_discovered = true;
// _new_inventory.insert(item_id(trx_message_type, transaction_message_hash));
//}
}
if (new_transaction_discovered)
trigger_advertise_inventory_loop();
} }
else else
dlog( "Already received and accepted this block (presumably through sync mechanism), treating it as accepted" ); dlog( "Already received and accepted this block (presumably through sync mechanism), treating it as accepted" );
@ -3532,7 +3556,10 @@ namespace graphene { namespace net { namespace detail {
fc::time_point message_validated_time; fc::time_point message_validated_time;
try try
{ {
_delegate->handle_message( message_to_process, false ); if (message_to_process.msg_type == trx_message_type)
_delegate->handle_transaction( message_to_process.as<trx_message>() );
else
_delegate->handle_message( message_to_process );
message_validated_time = fc::time_point::now(); message_validated_time = fc::time_point::now();
} }
catch ( const insufficient_relay_fee& ) catch ( const insufficient_relay_fee& )
@ -4914,8 +4941,8 @@ namespace graphene { namespace net { namespace detail {
INVOKE_IN_IMPL(clear_peer_database); INVOKE_IN_IMPL(clear_peer_database);
} }
void node::set_total_bandwidth_limit( uint32_t upload_bytes_per_second, void node::set_total_bandwidth_limit(uint32_t upload_bytes_per_second,
uint32_t download_bytes_per_second ) uint32_t download_bytes_per_second)
{ {
INVOKE_IN_IMPL(set_total_bandwidth_limit, upload_bytes_per_second, download_bytes_per_second); INVOKE_IN_IMPL(set_total_bandwidth_limit, upload_bytes_per_second, download_bytes_per_second);
} }
@ -4968,7 +4995,16 @@ namespace graphene { namespace net { namespace detail {
{ {
try try
{ {
destination_node->delegate->handle_message(destination_node->messages_to_deliver.front(), false); const message& message_to_deliver = destination_node->messages_to_deliver.front();
if (message_to_deliver.msg_type == trx_message_type)
destination_node->delegate->handle_transaction(message_to_deliver.as<trx_message>());
else if (message_to_deliver.msg_type == block_message_type)
{
std::vector<fc::uint160_t> contained_transaction_message_ids;
destination_node->delegate->handle_block(message_to_deliver.as<block_message>(), false, contained_transaction_message_ids);
}
else
destination_node->delegate->handle_message(message_to_deliver);
} }
catch ( const fc::exception& e ) catch ( const fc::exception& e )
{ {
@ -5098,27 +5134,25 @@ namespace graphene { namespace net { namespace detail {
INVOKE_AND_COLLECT_STATISTICS(has_item, id); INVOKE_AND_COLLECT_STATISTICS(has_item, id);
} }
bool statistics_gathering_node_delegate_wrapper::handle_message( const message& message_to_handle, bool sync_mode ) void statistics_gathering_node_delegate_wrapper::handle_message( const message& message_to_handle )
{ {
INVOKE_AND_COLLECT_STATISTICS(handle_message, message_to_handle, sync_mode); INVOKE_AND_COLLECT_STATISTICS(handle_message, message_to_handle);
} }
bool statistics_gathering_node_delegate_wrapper::handle_block( const graphene::net::block_message& blk_msg, bool syncmode ) bool statistics_gathering_node_delegate_wrapper::handle_block( const graphene::net::block_message& block_message, bool sync_mode, std::vector<fc::uint160_t>& contained_transaction_message_ids)
{ {
if (_thread->is_current()) { return _node_delegate->handle_block(blk_msg,syncmode); } INVOKE_AND_COLLECT_STATISTICS(handle_block, block_message, sync_mode, contained_transaction_message_ids);
else return _thread->async([&](){ return _node_delegate->handle_block(blk_msg,syncmode); }, "invoke handle_block").wait();
} }
bool statistics_gathering_node_delegate_wrapper::handle_transaction( const graphene::net::trx_message& trx_msg, bool syncmode ) void statistics_gathering_node_delegate_wrapper::handle_transaction( const graphene::net::trx_message& transaction_message )
{ {
if (_thread->is_current()) { return _node_delegate->handle_transaction(trx_msg,syncmode); } INVOKE_AND_COLLECT_STATISTICS(handle_transaction, transaction_message);
else return _thread->async([&](){ return _node_delegate->handle_transaction(trx_msg,syncmode); }, "invoke handle_transaction").wait();
} }
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_item_ids(uint32_t item_type, std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_item_ids(uint32_t item_type,
const std::vector<item_hash_t>& blockchain_synopsis, const std::vector<item_hash_t>& blockchain_synopsis,
uint32_t& remaining_item_count, uint32_t& remaining_item_count,
uint32_t limit /* = 2000 */) uint32_t limit /* = 2000 */)
{ {
INVOKE_AND_COLLECT_STATISTICS(get_item_ids, item_type, blockchain_synopsis, remaining_item_count, limit); INVOKE_AND_COLLECT_STATISTICS(get_item_ids, item_type, blockchain_synopsis, remaining_item_count, limit);
} }
@ -5134,8 +5168,8 @@ namespace graphene { namespace net { namespace detail {
} }
std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(uint32_t item_type, std::vector<item_hash_t> statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(uint32_t item_type,
const graphene::net::item_hash_t& reference_point /* = graphene::net::item_hash_t() */, const graphene::net::item_hash_t& reference_point /* = graphene::net::item_hash_t() */,
uint32_t number_of_blocks_after_reference_point /* = 0 */) uint32_t number_of_blocks_after_reference_point /* = 0 */)
{ {
INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, item_type, reference_point, number_of_blocks_after_reference_point); INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, item_type, reference_point, number_of_blocks_after_reference_point);
} }

View file

@ -1707,8 +1707,9 @@ public:
dbg_make_uia(master.name, "SHILL"); dbg_make_uia(master.name, "SHILL");
} catch(...) {/* Ignore; the asset probably already exists.*/} } catch(...) {/* Ignore; the asset probably already exists.*/}
fc::time_point start = fc::time_point::now(); for( int i = 0; i < number_of_accounts; ++i ) fc::time_point start = fc::time_point::now();
create_account_with_private_key(key, prefix + fc::to_string(i), master.name, master.name, true, false); for( int i = 0; i < number_of_accounts; ++i )
create_account_with_private_key(key, prefix + fc::to_string(i), master.name, master.name, /* broadcast = */ true, /* save wallet = */ false);
fc::time_point end = fc::time_point::now(); fc::time_point end = fc::time_point::now();
ilog("Created ${n} accounts in ${time} milliseconds", ilog("Created ${n} accounts in ${time} milliseconds",
("n", number_of_accounts)("time", (end - start).count() / 1000)); ("n", number_of_accounts)("time", (end - start).count() / 1000));