peerplays_migrated/libraries/net/peer_connection.cpp
Nathan Hourt 4d836dacb9 Ref !3/#376: Graphene Updates
This adds the most important updates to Graphene from BitShares. Most notably,
https://github.com/bitshares/bitshares-core/issues/1506

Second most notably, it updates Peerplays' FC to be in sync with BitShares FC.

This is a squash commit of several subcommits. The subcommit messages are
reproduced below:

Replace fc::uint128 with boost::multiprecision::uint128_t

replace smart_ref with shared_ptr

Fixes/Remove Unused

Remove NTP time

Remove old macro

This macro is now in FC, so no need to define it here anymore

Replaced fc::array with std::array

Separate exception declaration and implementation

Adapted to fc promise changes

Fixes

Add back in some of Peter's fixes that got lost in the cherry pick

_hash endianness fixes

Remove all uses of fc/smart_ref

It's gone, can't use it anymore

Replace improper static_variant operator overloads with comparators

Fixes

Remove boost::signals from build system; it's header-only so it's not
listed in cmake anymore.

Also remove some unused hashing code

Impl. pack/unpack functions for extension class

Ref #1506: Isolate chain/protocol to its own library

Ref #1506: Add object_downcast_t

Allows the more concise expression `object_downcast_t<xyz>` instead of
the old `typename object_downcast<xyz>::type`

Ref #1506: Move ID types from db to protocol

The ID types, object_id and object_id_type, were defined in the db
library, and the protocol library depends on db to get these types.
Technically, the ID types are defined by the protocol and used by the
database, and not vice versa. Therefore these types should be in the
protocol library, and db should depend on protocol to get them.

This commit makes it so.

Ref #1506: Isolate chain/protocol to its own library

Remove commented-out index code

Wrap overlength line

Remove unused key types

Probably fix Docker build

Fix build after rebase

Ref #1506/#1737: Some requested changes

Ref #1506/#1737: Macro-fy ID type definitions

Define macros to fully de-boilerplate ID type definitions.

Externalities:
 - Rename transaction_object -> transaction_history_object
 - Rename impl_asset_dynamic_data_type ->
impl_asset_dynamic_data_object_type
 - Rename impl_asset_bitasset_data_type ->
impl_asset_bitasset_data_object_type

The first is to avoid a naming collision on transaction_id_type, and the
other two are to maintain consistency with the naming of the other
types.

Ref #1506/#1737: Fix clean_name()

Ref #1506/#1737: Oops

Fix .gitignore

Externalized serialization in protocol library

Fix compile sets

Delete a couple of ghost files that were in the tree but not part
of the project (I accidentally added them to CMakeLists while
merging, but they're broken and not part of the Peerplays code), and
add several files that got dropped from the build during merge.

General fixes

Fix warnings, build issues, unused code, etc.

Fix #1772 by decprecating cli_wallet -H

More fixes

Fix errors and warnings and generally coax it to build

Fix test

I'm pretty sure this didn't break from what I did... But I can't build
the original code, so I can't tell. Anyways, this one now passes...
Others still fail...

Small fix

Fix crash in auth checks

Final fixes

Last round of fixes following the rebase to Beatrice

Rename project in CMakeLists.txt

The CMakeLists.txt declared this project as BitShares and not Peerplays,
which makes it confusing in IDEs. Rename it to be clear which project is
open.

Resolve #374

Replace all object refs in macros with IDs, and fix affected tests to look
up objects by ID rather than using invalidated refs.

A full audit of all tests should be performed to eliminate any further
usage of invalidated object references.

Resolve #373: Add object notifiers

Various fixes

Fixes to various issues, primarily reflections, that cropped up
during merge conflict resolution

Fix startup bug in Bookie plugin

Bookie plugin was preventing the node from starting up because it
registered its secondary indexes to create objects in its own primary
indexes to track objects being created in other primary indexes, and did
so during its `initialize()` step, which is to say, before the database
was loaded from disk at startup. This caused the secondary indexes to
create tracker objects when the observed indexes were loading objects
from disk. This then caused a failure when these tracker indexes were
later loaded from disk, and the first object IDs collided.

This is fixed by refraining from defining secondary indexes until the
`startup()` stage rather than the `initialize()` stage. Primary indexes
are registered in `initialize()`, secondary indexes are registered in
`startup()`.

This also involved adding a new method, "add_secondary_index()", to
`object_database`, as before there was no way to do this because you
couldn't get a non-const index from a non-const database.

I have no idea how this was working before I got here...

Fix egenesis install

Fixes after updates

Rebase on updated develop branch and fix conflicts
2021-11-11 11:25:47 -05:00

531 lines
22 KiB
C++

/*
* Copyright (c) 2015 Cryptonomex, Inc., and contributors.
*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <graphene/net/peer_connection.hpp>
#include <graphene/net/exceptions.hpp>
#include <graphene/net/config.hpp>
#include <graphene/protocol/config.hpp>
#include <graphene/protocol/fee_schedule.hpp>
#include <fc/io/raw_fwd.hpp>
#include <fc/thread/thread.hpp>
#include <boost/scope_exit.hpp>
#ifdef DEFAULT_LOGGER
# undef DEFAULT_LOGGER
#endif
#define DEFAULT_LOGGER "p2p"
#ifndef NDEBUG
# define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
#else
# define VERIFY_CORRECT_THREAD() do {} while (0)
#endif
namespace graphene { namespace net
{
message peer_connection::real_queued_message::get_message(peer_connection_delegate*)
{
if (message_send_time_field_offset != (size_t)-1)
{
// patch the current time into the message. Since this operates on the packed version of the structure,
// it won't work for anything after a variable-length field
std::vector<char> packed_current_time = fc::raw::pack(fc::time_point::now());
assert(message_send_time_field_offset + packed_current_time.size() <= message_to_send.data.size());
memcpy(message_to_send.data.data() + message_send_time_field_offset,
packed_current_time.data(), packed_current_time.size());
}
return message_to_send;
}
size_t peer_connection::real_queued_message::get_size_in_queue()
{
return message_to_send.data.size();
}
message peer_connection::virtual_queued_message::get_message(peer_connection_delegate* node)
{
return node->get_message_for_item(item_to_send);
}
size_t peer_connection::virtual_queued_message::get_size_in_queue()
{
return sizeof(item_id);
}
peer_connection::peer_connection(peer_connection_delegate* delegate) :
_node(delegate),
_message_connection(this),
_total_queued_messages_size(0),
direction(peer_connection_direction::unknown),
is_firewalled(firewalled_state::unknown),
our_state(our_connection_state::disconnected),
they_have_requested_close(false),
their_state(their_connection_state::disconnected),
we_have_requested_close(false),
negotiation_status(connection_negotiation_status::disconnected),
number_of_unfetched_item_ids(0),
peer_needs_sync_items_from_us(true),
we_need_sync_items_from_peer(true),
inhibit_fetching_sync_blocks(false),
transaction_fetching_inhibited_until(fc::time_point::min()),
last_known_fork_block_number(0),
firewall_check_state(nullptr),
#ifndef NDEBUG
_thread(&fc::thread::current()),
_send_message_queue_tasks_running(0),
#endif
_currently_handling_message(false)
{
}
peer_connection_ptr peer_connection::make_shared(peer_connection_delegate* delegate)
{
// The lifetime of peer_connection objects is managed by shared_ptrs in node. The peer_connection
// is responsible for notifying the node when it should be deleted, and the process of deleting it
// cleans up the peer connection's asynchronous tasks which are responsible for notifying the node
// when it should be deleted.
// To ease this vicious cycle, we slightly delay the execution of the destructor until the
// current task yields. In the (not uncommon) case where it is the task executing
// connect_to or read_loop, this allows the task to finish before the destructor is forced
// to cancel it.
return peer_connection_ptr(new peer_connection(delegate));
//, [](peer_connection* peer_to_delete){ fc::async([peer_to_delete](){delete peer_to_delete;}); });
}
void peer_connection::destroy()
{
VERIFY_CORRECT_THREAD();
#if 0 // this gets too verbose
#ifndef NDEBUG
struct scope_logger {
fc::optional<fc::ip::endpoint> endpoint;
scope_logger(const fc::optional<fc::ip::endpoint>& endpoint) : endpoint(endpoint) { dlog("entering peer_connection::destroy() for peer ${endpoint}", ("endpoint", endpoint)); }
~scope_logger() { dlog("leaving peer_connection::destroy() for peer ${endpoint}", ("endpoint", endpoint)); }
} send_message_scope_logger(get_remote_endpoint());
#endif
#endif
try
{
dlog("calling close_connection()");
close_connection();
dlog("close_connection completed normally");
}
catch ( const fc::canceled_exception& )
{
assert(false && "the task that deletes peers should not be canceled because it will prevent us from cleaning up correctly");
}
catch ( ... )
{
dlog("close_connection threw");
}
try
{
dlog("canceling _send_queued_messages task");
_send_queued_messages_done.cancel_and_wait(__FUNCTION__);
dlog("cancel_and_wait completed normally");
}
catch( const fc::exception& e )
{
wlog("Unexpected exception from peer_connection's send_queued_messages_task : ${e}", ("e", e));
}
catch( ... )
{
wlog("Unexpected exception from peer_connection's send_queued_messages_task");
}
try
{
dlog("canceling accept_or_connect_task");
accept_or_connect_task_done.cancel_and_wait(__FUNCTION__);
dlog("accept_or_connect_task completed normally");
}
catch( const fc::exception& e )
{
wlog("Unexpected exception from peer_connection's accept_or_connect_task : ${e}", ("e", e));
}
catch( ... )
{
wlog("Unexpected exception from peer_connection's accept_or_connect_task");
}
_message_connection.destroy_connection(); // shut down the read loop
}
peer_connection::~peer_connection()
{
VERIFY_CORRECT_THREAD();
destroy();
}
fc::tcp_socket& peer_connection::get_socket()
{
VERIFY_CORRECT_THREAD();
return _message_connection.get_socket();
}
void peer_connection::accept_connection()
{
VERIFY_CORRECT_THREAD();
struct scope_logger {
scope_logger() { dlog("entering peer_connection::accept_connection()"); }
~scope_logger() { dlog("leaving peer_connection::accept_connection()"); }
} accept_connection_scope_logger;
try
{
assert( our_state == our_connection_state::disconnected &&
their_state == their_connection_state::disconnected );
direction = peer_connection_direction::inbound;
negotiation_status = connection_negotiation_status::accepting;
_message_connection.accept(); // perform key exchange
negotiation_status = connection_negotiation_status::accepted;
_remote_endpoint = _message_connection.get_socket().remote_endpoint();
// firewall-detecting info is pretty useless for inbound connections, but initialize
// it the best we can
fc::ip::endpoint local_endpoint = _message_connection.get_socket().local_endpoint();
inbound_address = local_endpoint.get_address();
inbound_port = local_endpoint.port();
outbound_port = inbound_port;
their_state = their_connection_state::just_connected;
our_state = our_connection_state::just_connected;
ilog( "established inbound connection from ${remote_endpoint}, sending hello", ("remote_endpoint", _message_connection.get_socket().remote_endpoint() ) );
}
catch ( const fc::exception& e )
{
wlog( "error accepting connection ${e}", ("e", e.to_detail_string() ) );
throw;
}
}
void peer_connection::connect_to( const fc::ip::endpoint& remote_endpoint, fc::optional<fc::ip::endpoint> local_endpoint )
{
VERIFY_CORRECT_THREAD();
try
{
assert( our_state == our_connection_state::disconnected &&
their_state == their_connection_state::disconnected );
direction = peer_connection_direction::outbound;
_remote_endpoint = remote_endpoint;
if( local_endpoint )
{
// the caller wants us to bind the local side of this socket to a specific ip/port
// This depends on the ip/port being unused, and on being able to set the
// SO_REUSEADDR/SO_REUSEPORT flags, and either of these might fail, so we need to
// detect if this fails.
try
{
_message_connection.bind( *local_endpoint );
}
catch ( const fc::canceled_exception& )
{
throw;
}
catch ( const fc::exception& except )
{
wlog( "Failed to bind to desired local endpoint ${endpoint}, will connect using an OS-selected endpoint: ${except}", ("endpoint", *local_endpoint )("except", except ) );
}
}
negotiation_status = connection_negotiation_status::connecting;
_message_connection.connect_to( remote_endpoint );
negotiation_status = connection_negotiation_status::connected;
their_state = their_connection_state::just_connected;
our_state = our_connection_state::just_connected;
ilog( "established outbound connection to ${remote_endpoint}", ("remote_endpoint", remote_endpoint ) );
}
catch ( fc::exception& e )
{
wlog( "fatal: error connecting to peer ${remote_endpoint}: ${e}", ("remote_endpoint", remote_endpoint )("e", e.to_detail_string() ) );
throw;
}
} // connect_to()
void peer_connection::on_message( message_oriented_connection* originating_connection, const message& received_message )
{
VERIFY_CORRECT_THREAD();
_currently_handling_message = true;
BOOST_SCOPE_EXIT(this_) {
this_->_currently_handling_message = false;
} BOOST_SCOPE_EXIT_END
_node->on_message( this, received_message );
}
void peer_connection::on_connection_closed( message_oriented_connection* originating_connection )
{
VERIFY_CORRECT_THREAD();
negotiation_status = connection_negotiation_status::closed;
_node->on_connection_closed( this );
}
void peer_connection::send_queued_messages_task()
{
VERIFY_CORRECT_THREAD();
#ifndef NDEBUG
struct counter {
unsigned& _send_message_queue_tasks_counter;
counter(unsigned& var) : _send_message_queue_tasks_counter(var) { /* dlog("entering peer_connection::send_queued_messages_task()"); */ assert(_send_message_queue_tasks_counter == 0); ++_send_message_queue_tasks_counter; }
~counter() { assert(_send_message_queue_tasks_counter == 1); --_send_message_queue_tasks_counter; /* dlog("leaving peer_connection::send_queued_messages_task()"); */ }
} concurrent_invocation_counter(_send_message_queue_tasks_running);
#endif
while (!_queued_messages.empty())
{
_queued_messages.front()->transmission_start_time = fc::time_point::now();
message message_to_send = _queued_messages.front()->get_message(_node);
try
{
//dlog("peer_connection::send_queued_messages_task() calling message_oriented_connection::send_message() "
// "to send message of type ${type} for peer ${endpoint}",
// ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint()));
_message_connection.send_message(message_to_send);
//dlog("peer_connection::send_queued_messages_task()'s call to message_oriented_connection::send_message() completed normally for peer ${endpoint}",
// ("endpoint", get_remote_endpoint()));
}
catch (const fc::canceled_exception&)
{
dlog("message_oriented_connection::send_message() was canceled, rethrowing canceled_exception");
throw;
}
catch (const fc::exception& send_error)
{
wlog("Error sending message: ${exception}. Closing connection.", ("exception", send_error));
try
{
close_connection();
}
catch (const fc::exception& close_error)
{
wlog("Caught error while closing connection: ${exception}", ("exception", close_error));
}
return;
}
catch (const std::exception& e)
{
wlog("message_oriented_exception::send_message() threw a std::exception(): ${what}", ("what", e.what()));
}
catch (...)
{
wlog("message_oriented_exception::send_message() threw an unhandled exception");
}
_queued_messages.front()->transmission_finish_time = fc::time_point::now();
_total_queued_messages_size -= _queued_messages.front()->get_size_in_queue();
_queued_messages.pop();
}
//dlog("leaving peer_connection::send_queued_messages_task() due to queue exhaustion");
}
void peer_connection::send_queueable_message(std::unique_ptr<queued_message>&& message_to_send)
{
VERIFY_CORRECT_THREAD();
_total_queued_messages_size += message_to_send->get_size_in_queue();
_queued_messages.emplace(std::move(message_to_send));
if (_total_queued_messages_size > GRAPHENE_NET_MAXIMUM_QUEUED_MESSAGES_IN_BYTES)
{
wlog("send queue exceeded maximum size of ${max} bytes (current size ${current} bytes)",
("max", GRAPHENE_NET_MAXIMUM_QUEUED_MESSAGES_IN_BYTES)("current", _total_queued_messages_size));
try
{
close_connection();
}
catch (const fc::exception& e)
{
elog("Caught error while closing connection: ${exception}", ("exception", e));
}
return;
}
if( _send_queued_messages_done.valid() && _send_queued_messages_done.canceled() )
FC_THROW_EXCEPTION(fc::exception, "Attempting to send a message on a connection that is being shut down");
if (!_send_queued_messages_done.valid() || _send_queued_messages_done.ready())
{
//dlog("peer_connection::send_message() is firing up send_queued_message_task");
_send_queued_messages_done = fc::async([this](){ send_queued_messages_task(); }, "send_queued_messages_task");
}
//else
// dlog("peer_connection::send_message() doesn't need to fire up send_queued_message_task, it's already running");
}
void peer_connection::send_message(const message& message_to_send, size_t message_send_time_field_offset)
{
VERIFY_CORRECT_THREAD();
//dlog("peer_connection::send_message() enqueueing message of type ${type} for peer ${endpoint}",
// ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint()));
std::unique_ptr<queued_message> message_to_enqueue(new real_queued_message(message_to_send, message_send_time_field_offset));
send_queueable_message(std::move(message_to_enqueue));
}
void peer_connection::send_item(const item_id& item_to_send)
{
VERIFY_CORRECT_THREAD();
//dlog("peer_connection::send_item() enqueueing message of type ${type} for peer ${endpoint}",
// ("type", item_to_send.item_type)("endpoint", get_remote_endpoint()));
std::unique_ptr<queued_message> message_to_enqueue(new virtual_queued_message(item_to_send));
send_queueable_message(std::move(message_to_enqueue));
}
void peer_connection::close_connection()
{
VERIFY_CORRECT_THREAD();
negotiation_status = connection_negotiation_status::closing;
if (connection_terminated_time != fc::time_point::min())
connection_terminated_time = fc::time_point::now();
_message_connection.close_connection();
}
void peer_connection::destroy_connection()
{
VERIFY_CORRECT_THREAD();
negotiation_status = connection_negotiation_status::closing;
destroy();
}
uint64_t peer_connection::get_total_bytes_sent() const
{
VERIFY_CORRECT_THREAD();
return _message_connection.get_total_bytes_sent();
}
uint64_t peer_connection::get_total_bytes_received() const
{
VERIFY_CORRECT_THREAD();
return _message_connection.get_total_bytes_received();
}
fc::time_point peer_connection::get_last_message_sent_time() const
{
VERIFY_CORRECT_THREAD();
return _message_connection.get_last_message_sent_time();
}
fc::time_point peer_connection::get_last_message_received_time() const
{
VERIFY_CORRECT_THREAD();
return _message_connection.get_last_message_received_time();
}
fc::optional<fc::ip::endpoint> peer_connection::get_remote_endpoint()
{
VERIFY_CORRECT_THREAD();
return _remote_endpoint;
}
fc::ip::endpoint peer_connection::get_local_endpoint()
{
VERIFY_CORRECT_THREAD();
return _message_connection.get_socket().local_endpoint();
}
void peer_connection::set_remote_endpoint( fc::optional<fc::ip::endpoint> new_remote_endpoint )
{
VERIFY_CORRECT_THREAD();
_remote_endpoint = new_remote_endpoint;
}
bool peer_connection::busy() const
{
VERIFY_CORRECT_THREAD();
return !items_requested_from_peer.empty() || !sync_items_requested_from_peer.empty() || item_ids_requested_from_peer;
}
bool peer_connection::idle() const
{
VERIFY_CORRECT_THREAD();
return !busy();
}
bool peer_connection::is_currently_handling_message() const
{
VERIFY_CORRECT_THREAD();
return _currently_handling_message;
}
bool peer_connection::is_transaction_fetching_inhibited() const
{
VERIFY_CORRECT_THREAD();
return transaction_fetching_inhibited_until > fc::time_point::now();
}
fc::sha512 peer_connection::get_shared_secret() const
{
VERIFY_CORRECT_THREAD();
return _message_connection.get_shared_secret();
}
void peer_connection::clear_old_inventory()
{
VERIFY_CORRECT_THREAD();
fc::time_point_sec oldest_inventory_to_keep(fc::time_point::now() - fc::minutes(GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES));
// expire old items from inventory_advertised_to_peer
auto oldest_inventory_to_keep_iter = inventory_advertised_to_peer.get<timestamp_index>().lower_bound(oldest_inventory_to_keep);
auto begin_iter = inventory_advertised_to_peer.get<timestamp_index>().begin();
unsigned number_of_elements_advertised_to_peer_to_discard = std::distance(begin_iter, oldest_inventory_to_keep_iter);
inventory_advertised_to_peer.get<timestamp_index>().erase(begin_iter, oldest_inventory_to_keep_iter);
// also expire items from inventory_peer_advertised_to_us
oldest_inventory_to_keep_iter = inventory_peer_advertised_to_us.get<timestamp_index>().lower_bound(oldest_inventory_to_keep);
begin_iter = inventory_peer_advertised_to_us.get<timestamp_index>().begin();
unsigned number_of_elements_peer_advertised_to_discard = std::distance(begin_iter, oldest_inventory_to_keep_iter);
inventory_peer_advertised_to_us.get<timestamp_index>().erase(begin_iter, oldest_inventory_to_keep_iter);
dlog("Expiring old inventory for peer ${peer}: removing ${to_peer} items advertised to peer (${remain_to_peer} left), and ${to_us} advertised to us (${remain_to_us} left)",
("peer", get_remote_endpoint())
("to_peer", number_of_elements_advertised_to_peer_to_discard)("remain_to_peer", inventory_advertised_to_peer.size())
("to_us", number_of_elements_peer_advertised_to_discard)("remain_to_us", inventory_peer_advertised_to_us.size()));
}
// we have a higher limit for blocks than transactions so we will still fetch blocks even when transactions are throttled
bool peer_connection::is_inventory_advertised_to_us_list_full_for_transactions() const
{
VERIFY_CORRECT_THREAD();
return inventory_peer_advertised_to_us.size() > GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES * GRAPHENE_NET_MAX_TRX_PER_SECOND * 60;
}
bool peer_connection::is_inventory_advertised_to_us_list_full() const
{
VERIFY_CORRECT_THREAD();
// allow the total inventory size to be the maximum number of transactions we'll store in the inventory (above)
// plus the maximum number of blocks that would be generated in GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES (plus one,
// to give us some wiggle room)
return inventory_peer_advertised_to_us.size() >
GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES * GRAPHENE_NET_MAX_TRX_PER_SECOND * 60 +
(GRAPHENE_NET_MAX_INVENTORY_SIZE_IN_MINUTES + 1) * 60 / GRAPHENE_MIN_BLOCK_INTERVAL;
}
bool peer_connection::performing_firewall_check() const
{
return firewall_check_state && firewall_check_state->requesting_peer != node_id_t();
}
fc::optional<fc::ip::endpoint> peer_connection::get_endpoint_for_connecting() const
{
if (inbound_port)
return fc::ip::endpoint(inbound_address, inbound_port);
return fc::optional<fc::ip::endpoint>();
}
} } // end namespace graphene::net