restoring p2p code

This commit is contained in:
Daniel Larimer 2015-09-03 08:31:03 -04:00
parent 198cfd1231
commit 86bb4cdbca
11 changed files with 1356 additions and 0 deletions

View file

@ -0,0 +1,32 @@
file(GLOB HEADERS "include/graphene/p2p/*.hpp")
set(SOURCES node.cpp
stcp_socket.cpp
peer_connection.cpp
message_oriented_connection.cpp)
add_library( graphene_p2p ${SOURCES} ${HEADERS} )
target_link_libraries( graphene_p2p
PUBLIC fc graphene_db )
target_include_directories( graphene_p2p
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include"
PRIVATE "${CMAKE_SOURCE_DIR}/libraries/chain/include"
)
#if(MSVC)
# set_source_files_properties( node.cpp PROPERTIES COMPILE_FLAGS "/bigobj" )
#endif(MSVC)
#if (USE_PCH)
# set_target_properties(graphene_p2p PROPERTIES COTIRE_ADD_UNITY_BUILD FALSE)
# cotire(graphene_p2p )
#endif(USE_PCH)
install( TARGETS
graphene_p2p
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
)

90
libraries/p2p/design.md Normal file
View file

@ -0,0 +1,90 @@
# Network Protocol 2
Building a low-latency network requires P2P nodes that have low-latency
connections and a protocol designed to minimize latency. for the purpose
of this document we will assume that two nodes are located on opposite
sides of the globe with a ping time of 250ms.
## Announce, Request, Send Protocol
Under the prior network archtiecture, transactions and blocks were broadcast
in a manner similar to the Bitcoin protocol: inventory messages notify peers of
transactions and blocks, then peers fetch the transaction or block from one
peer. After validating the item a node will broadcast an inventory message to
its peers.
Under this model it will take 0.75 seconds for a peer to communicate a transaction
or block to another peer even if their size was 0 and there was no processing overhead.
This level of performance is unacceptable for a network attempting to produce one block
every second.
This prior protocol also sent every transaction twice: initial broadcast, and again as
part of a block.
## Push Protocol
To minimize latency each node needs to immediately broadcast the data it receives
to its peers after validating it. Given the average transaction size is less than
100 bytes, it is almost as effecient to send the transaction as it is to send
the notice (assuming a 20 byte transaction id)
Each node implements the following protocol:
onReceiveTransaction( from_peer, transaction )
if( isKnown( transaction.id() ) )
return
markKnown( transaction.id() )
if( !validate( transaction ) )
return
for( peer : peers )
if( peer != from_peer )
send( peer, transaction )
onReceiveBlock( from_peer, block_summary )
if( isKnown( block_summary )
return
full_block = reconstructFullBlcok( from_peer, block_summary )
if( !full_block ) disconnect from_peer
markKnown( block_summary )
if( !pushBlock( full_block ) ) disconnect from_peer
for( peer : peers )
if( peer != from_peer )
send( peer, block_summary )
onConnect( new_peer, new_peer_head_block_num )
if( peers.size() >= max_peers )
send( new_peer, peers )
disconnect( new_peer )
return
while( new_peer_head_block_num < our_head_block_num )
sendFullBlock( new_peer, ++new_peer_head_block_num )
new_peer.synced = true
for( peer : peers )
send( peer, new_peer )
onReceivePeers( from_peer, peers )
addToPotentialPeers( peers )
onUpdateConnectionsTimer
if( peers.size() < desired_peers )
connect( random_potential_peer )
onFullBlock( from_peer, full_block )
if( !pushBlock( full_block ) ) disconnect from_peer
onStartup
init_potential_peers from config
start onUpdateConnectionsTimer

View file

@ -0,0 +1,151 @@
/** Copyright (c) 2015, Cryptonomex, Inc. All rights reserved. */
#pragma once
#include <fc/array.hpp>
#include <fc/io/varint.hpp>
#include <fc/network/ip.hpp>
#include <fc/io/raw.hpp>
#include <fc/crypto/ripemd160.hpp>
#include <fc/reflect/variant.hpp>
namespace graphene { namespace p2p {
struct message_header
{
uint32_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE
uint32_t msg_type;
};
typedef fc::uint160_t message_hash_type;
/**
* Abstracts the process of packing/unpacking a message for a
* particular channel.
*/
struct message : public message_header
{
std::vector<char> data;
message(){}
message( message&& m )
:message_header(m),data( std::move(m.data) ){}
message( const message& m )
:message_header(m),data( m.data ){}
/**
* Assumes that T::type specifies the message type
*/
template<typename T>
message( const T& m )
{
msg_type = T::type;
data = fc::raw::pack(m);
size = (uint32_t)data.size();
}
fc::uint160_t id()const
{
return fc::ripemd160::hash( data.data(), (uint32_t)data.size() );
}
/**
* Automatically checks the type and deserializes T in the
* opposite process from the constructor.
*/
template<typename T>
T as()const
{
try {
FC_ASSERT( msg_type == T::type );
T tmp;
if( data.size() )
{
fc::datastream<const char*> ds( data.data(), data.size() );
fc::raw::unpack( ds, tmp );
}
else
{
// just to make sure that tmp shouldn't have any data
fc::datastream<const char*> ds( nullptr, 0 );
fc::raw::unpack( ds, tmp );
}
return tmp;
} FC_RETHROW_EXCEPTIONS( warn,
"error unpacking network message as a '${type}' ${x} !=? ${msg_type}",
("type", fc::get_typename<T>::name() )
("x", T::type)
("msg_type", msg_type)
);
}
};
enum core_message_type_enum {
hello_message_type = 1000,
transaction_message_type = 1001,
block_message_type = 1002,
peer_message_type = 1003,
error_message_type = 1004
};
struct hello_message
{
static const core_message_type_enum type;
std::string user_agent;
uint16_t version;
fc::ip::address inbound_address;
uint16_t inbound_port;
uint16_t outbound_port;
node_id_t node_public_key;
fc::sha256 chain_id;
fc::variant_object user_data;
block_id_type head_block;
};
struct transaction_message
{
static const core_message_type_enum type;
signed_transaction trx;
};
struct block_summary_message
{
static const core_message_type_enum type;
signed_block_header header;
vector<transaction_id_type> transaction_ids;
};
struct full_block_message
{
static const core_message_type_enum type;
signed_block block;
};
struct peers_message
{
static const core_message_type_enum type;
vector<fc::ip::endpoint> peers;
};
struct error_message
{
static const core_message_type_enum type;
string message;
};
} } // graphene::p2p
FC_REFLECT( graphene::p2p::message_header, (size)(msg_type) )
FC_REFLECT_DERIVED( graphene::p2p::message, (graphene::p2p::message_header), (data) )
FC_REFLECT_ENUM( graphene::p2p::core_message_type_enum,
(hello_message_type)
(transaction_message_type)
(block_message_type)
(peer_message_type)
(error_message_type)
)

View file

@ -0,0 +1,49 @@
/** Copyright (c) 2015, Cryptonomex, Inc. All rights reserved. */
#pragma once
#include <fc/network/tcp_socket.hpp>
#include <graphene/p2p/message.hpp>
namespace graphene { namespace p2p {
namespace detail { class message_oriented_connection_impl; }
class message_oriented_connection;
/** receives incoming messages from a message_oriented_connection object */
class message_oriented_connection_delegate
{
public:
virtual void on_message( message_oriented_connection* originating_connection,
const message& received_message) = 0;
virtual void on_connection_closed(message_oriented_connection* originating_connection) = 0;
};
/** uses a secure socket to create a connection that reads and writes a stream of `fc::p2p::message` objects */
class message_oriented_connection
{
public:
message_oriented_connection(message_oriented_connection_delegate* delegate = nullptr);
~message_oriented_connection();
fc::tcp_socket& get_socket();
void accept();
void bind(const fc::ip::endpoint& local_endpoint);
void connect_to(const fc::ip::endpoint& remote_endpoint);
void send_message(const message& message_to_send);
void close_connection();
void destroy_connection();
uint64_t get_total_bytes_sent() const;
uint64_t get_total_bytes_received() const;
fc::time_point get_last_message_sent_time() const;
fc::time_point get_last_message_received_time() const;
fc::time_point get_connection_time() const;
fc::sha512 get_shared_secret() const;
private:
std::unique_ptr<detail::message_oriented_connection_impl> my;
};
typedef std::shared_ptr<message_oriented_connection> message_oriented_connection_ptr;
} } // graphene::net

View file

@ -0,0 +1,74 @@
/** Copyright (c) 2015, Cryptonomex, Inc. */
#pragma once
#include <graphene/chain/database.hpp>
#include <graphene/p2p/peer_connection.hpp>
namespace graphene { namespace p2p {
using namespace graphene::chain;
struct node_config
{
fc::ip::endpoint server_endpoint;
bool wait_if_not_available = true;
uint32_t desired_peers;
uint32_t max_peers;
/** receive, but don't rebroadcast data */
bool subscribe_only = false;
public_key_type node_id;
vector<fc::ip::endpoint> seed_nodes;
};
struct by_remote_endpoint;
struct by_peer_id;
/**
* @ingroup object_index
*/
typedef multi_index_container<
peer_connection_ptr,
indexed_by<
ordered_unique< tag<by_remote_endpoint>,
const_mem_fun< peer_connection, fc::ip::endpoint, &peer_connection::get_remote_endpoint > >,
ordered_unique< tag<by_peer_id>, member< peer_connection, public_key_type, &peer_connection::node_id > >
>
> peer_connection_index;
class node : public std::enable_shared_from_this<node>
{
public:
server( chain_database& db );
void add_peer( const fc::ip::endpoint& ep );
void configure( const node_config& cfg );
void on_incomming_connection( peer_connection_ptr new_peer );
void on_hello( peer_connection_ptr new_peer, hello_message m );
void on_transaction( peer_connection_ptr from_peer, transaction_message m );
void on_block( peer_connection_ptr from_peer, block_message m );
void on_peers( peer_connection_ptr from_peer, peers_message m );
void on_error( peer_connection_ptr from_peer, error_message m );
void on_full_block( peer_connection_ptr from_peer, full_block_message m );
void on_update_connections();
private:
/**
* Specifies the network interface and port upon which incoming
* connections should be accepted.
*/
void listen_on_endpoint( fc::ip::endpoint ep, bool wait_if_not_available );
void accept_loop();
graphene::chain::database& _db;
fc::tcp_server _tcp_server;
fc::ip::endpoint _actual_listening_endpoint;
fc::future<void> _accept_loop_complete;
peer_connection_index _peers;
};
} } /// graphene::p2p

View file

@ -0,0 +1,179 @@
/*
* Copyright (c) 2015, Cryptonomex, Inc.
* All rights reserved.
*
* This source code is provided for evaluation in private test networks only, until September 8, 2015. After this date, this license expires and
* the code may not be used, modified or distributed for any purpose. Redistribution and use in source and binary forms, with or without modification,
* are permitted until September 8, 2015, provided that the following conditions are met:
*
* 1. The code and/or derivative works are used only for private test networks consisting of no more than 10 P2P nodes.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <graphene/p2p/node.hpp>
#include <graphene/p2p/message_oriented_connection.hpp>
#include <graphene/p2p/stcp_socket.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index/tag.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <queue>
#include <boost/container/deque.hpp>
#include <fc/thread/future.hpp>
namespace graphene { namespace p2p {
class peer_connection;
class peer_connection_delegate
{
public:
virtual void on_message(peer_connection* originating_peer,
const message& received_message) = 0;
virtual void on_connection_closed(peer_connection* originating_peer) = 0;
virtual message get_message_for_item(const item_id& item) = 0;
};
class peer_connection;
typedef std::shared_ptr<peer_connection> peer_connection_ptr;
class peer_connection : public message_oriented_connection_delegate,
public std::enable_shared_from_this<peer_connection>
{
public:
enum direction_type { inbound, outbound };
enum connection_state {
connecting = 0,
syncing = 1,
synced = 2
};
fc::time_point connection_initiation_time;
fc::time_point connection_closed_time;
fc::time_point connection_terminated_time;
direction_type direction = outbound;
connection_state state = connecting;
bool is_firewalled = true
//connection_state state;
fc::microseconds clock_offset;
fc::microseconds round_trip_delay;
/// data about the peer node
/// @{
/** the unique identifier we'll use to refer to the node with. zero-initialized before
* we receive the hello message, at which time it will be filled with either the "node_id"
* from the user_data field of the hello, or if none is present it will be filled with a
* copy of node_public_key */
public_key_type node_id;
uint32_t core_protocol_version;
std::string user_agent;
fc::optional<std::string> graphene_git_revision_sha;
fc::optional<fc::time_point_sec> graphene_git_revision_unix_timestamp;
fc::optional<std::string> fc_git_revision_sha;
fc::optional<fc::time_point_sec> fc_git_revision_unix_timestamp;
fc::optional<std::string> platform;
fc::optional<uint32_t> bitness;
// for inbound connections, these fields record what the peer sent us in
// its hello message. For outbound, they record what we sent the peer
// in our hello message
fc::ip::address inbound_address;
uint16_t inbound_port;
uint16_t outbound_port;
/// @}
void send( transaction_message_ptr msg )
{
// if not in sent_or_received then insert into _pending_send
// if process_send_queue is invalid or complete then
// async process_send_queue
}
void received_transaction( const transaction_id_type& id )
{
_sent_or_received.insert(id);
}
void process_send_queue()
{
// while _pending_send.size() || _pending_blocks.size()
// while there are pending blocks, then take the oldest
// for each transaction id, verify that it exists in _sent_or_received
// else find it in the _pending_send queue and send it
// send one from _pending_send
}
std::unordered_map<transaction_id_type, transaction_message_ptr> _pending_send;
/// todo: make multi-index that tracks how long items have been cached and removes them
/// after a resasonable period of time (say 10 seconds)
std::unordered_set<transaction_id_type> _sent_or_received;
std::map<uint32_t,block_message_ptr> _pending_blocks;
fc::ip::endpoint get_remote_endpoint()const
{ return get_socket().get_remote_endpoint(); }
void on_message(message_oriented_connection* originating_connection,
const message& received_message) override
{
switch( core_message_type_enum( received_message.type ) )
{
case hello_message_type:
_node->on_hello( shared_from_this(),
received_message.as<hello_message>() );
break;
case transaction_message_type:
_node->on_transaction( shared_from_this(),
received_message.as<transaction_message>() );
break;
case block_message_type:
_node->on_block( shared_from_this(),
received_message.as<block_message>() );
break;
case peer_message_type:
_node->on_peers( shared_from_this(),
received_message.as<peers_message>() );
break;
}
}
void on_connection_closed(message_oriented_connection* originating_connection) override
{
_node->on_close( shared_from_this() );
}
fc::tcp_socket& get_socket() { return _message_connection.get_socket(); }
private:
peer_connection_delegate* _node;
fc::optional<fc::ip::endpoint> _remote_endpoint;
message_oriented_connection _message_connection;
};
typedef std::shared_ptr<peer_connection> peer_connection_ptr;
} } // end namespace graphene::p2p
// not sent over the wire, just reflected for logging
FC_REFLECT_ENUM(graphene::p2p::peer_connection::connection_state, (connecting)(syncing)(synced) )
FC_REFLECT_ENUM(graphene::p2p::peer_connection::direction_type, (inbound)(outbound) )

View file

@ -0,0 +1,59 @@
/*
* Copyright (c) 2015, Cryptonomex, Inc. All rights reserved.
*/
#pragma once
#include <fc/network/tcp_socket.hpp>
#include <fc/crypto/aes.hpp>
#include <fc/crypto/elliptic.hpp>
namespace graphene { namespace p2p {
/**
* Uses ECDH to negotiate a aes key for communicating
* with other nodes on the network.
*/
class stcp_socket : public virtual fc::iostream
{
public:
stcp_socket();
~stcp_socket();
fc::tcp_socket& get_socket() { return _sock; }
void accept();
void connect_to( const fc::ip::endpoint& remote_endpoint );
void bind( const fc::ip::endpoint& local_endpoint );
virtual size_t readsome( char* buffer, size_t max );
virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset );
virtual bool eof()const;
virtual size_t writesome( const char* buffer, size_t len );
virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset );
virtual void flush();
virtual void close();
using istream::get;
void get( char& c ) { read( &c, 1 ); }
fc::sha512 get_shared_secret() const { return _shared_secret; }
private:
void do_key_exchange();
fc::sha512 _shared_secret;
fc::ecc::private_key _priv_key;
fc::array<char,8> _buf;
//uint32_t _buf_len;
fc::tcp_socket _sock;
fc::aes_encoder _send_aes;
fc::aes_decoder _recv_aes;
std::shared_ptr<char> _read_buffer;
std::shared_ptr<char> _write_buffer;
#ifndef NDEBUG
bool _read_buffer_in_use;
bool _write_buffer_in_use;
#endif
};
typedef std::shared_ptr<stcp_socket> stcp_socket_ptr;
} } // graphene::p2p

View file

@ -0,0 +1,393 @@
/*
* Copyright (c) 2015, Cryptonomex, Inc.
* All rights reserved.
*/
#include <fc/thread/thread.hpp>
#include <fc/thread/mutex.hpp>
#include <fc/thread/scoped_lock.hpp>
#include <fc/thread/future.hpp>
#include <fc/log/logger.hpp>
#include <fc/io/enum_type.hpp>
#include <graphene/p2p/message_oriented_connection.hpp>
#include <graphene/p2p/stcp_socket.hpp>
#include <graphene/p2p/config.hpp>
#ifdef DEFAULT_LOGGER
# undef DEFAULT_LOGGER
#endif
#define DEFAULT_LOGGER "p2p"
#ifndef NDEBUG
# define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
#else
# define VERIFY_CORRECT_THREAD() do {} while (0)
#endif
namespace graphene { namespace p2p {
namespace detail
{
class message_oriented_connection_impl
{
private:
message_oriented_connection* _self;
message_oriented_connection_delegate *_delegate;
stcp_socket _sock;
fc::future<void> _read_loop_done;
uint64_t _bytes_received;
uint64_t _bytes_sent;
fc::time_point _connected_time;
fc::time_point _last_message_received_time;
fc::time_point _last_message_sent_time;
bool _send_message_in_progress;
#ifndef NDEBUG
fc::thread* _thread;
#endif
void read_loop();
void start_read_loop();
public:
fc::tcp_socket& get_socket();
void accept();
void connect_to(const fc::ip::endpoint& remote_endpoint);
void bind(const fc::ip::endpoint& local_endpoint);
message_oriented_connection_impl(message_oriented_connection* self,
message_oriented_connection_delegate* delegate = nullptr);
~message_oriented_connection_impl();
void send_message(const message& message_to_send);
void close_connection();
void destroy_connection();
uint64_t get_total_bytes_sent() const;
uint64_t get_total_bytes_received() const;
fc::time_point get_last_message_sent_time() const;
fc::time_point get_last_message_received_time() const;
fc::time_point get_connection_time() const { return _connected_time; }
fc::sha512 get_shared_secret() const;
};
message_oriented_connection_impl::message_oriented_connection_impl(message_oriented_connection* self,
message_oriented_connection_delegate* delegate)
: _self(self),
_delegate(delegate),
_bytes_received(0),
_bytes_sent(0),
_send_message_in_progress(false)
#ifndef NDEBUG
,_thread(&fc::thread::current())
#endif
{
}
message_oriented_connection_impl::~message_oriented_connection_impl()
{
VERIFY_CORRECT_THREAD();
destroy_connection();
}
fc::tcp_socket& message_oriented_connection_impl::get_socket()
{
VERIFY_CORRECT_THREAD();
return _sock.get_socket();
}
void message_oriented_connection_impl::accept()
{
VERIFY_CORRECT_THREAD();
_sock.accept();
assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
_read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
}
void message_oriented_connection_impl::connect_to(const fc::ip::endpoint& remote_endpoint)
{
VERIFY_CORRECT_THREAD();
_sock.connect_to(remote_endpoint);
assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
_read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
}
void message_oriented_connection_impl::bind(const fc::ip::endpoint& local_endpoint)
{
VERIFY_CORRECT_THREAD();
_sock.bind(local_endpoint);
}
void message_oriented_connection_impl::read_loop()
{
VERIFY_CORRECT_THREAD();
const int BUFFER_SIZE = 16;
const int LEFTOVER = BUFFER_SIZE - sizeof(message_header);
static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer");
_connected_time = fc::time_point::now();
fc::oexception exception_to_rethrow;
bool call_on_connection_closed = false;
try
{
message m;
while( true )
{
char buffer[BUFFER_SIZE];
_sock.read(buffer, BUFFER_SIZE);
_bytes_received += BUFFER_SIZE;
memcpy((char*)&m, buffer, sizeof(message_header));
FC_ASSERT( m.size <= MAX_MESSAGE_SIZE, "", ("m.size",m.size)("MAX_MESSAGE_SIZE",MAX_MESSAGE_SIZE) );
size_t remaining_bytes_with_padding = 16 * ((m.size - LEFTOVER + 15) / 16);
m.data.resize(LEFTOVER + remaining_bytes_with_padding); //give extra 16 bytes to allow for padding added in send call
std::copy(buffer + sizeof(message_header), buffer + sizeof(buffer), m.data.begin());
if (remaining_bytes_with_padding)
{
_sock.read(&m.data[LEFTOVER], remaining_bytes_with_padding);
_bytes_received += remaining_bytes_with_padding;
}
m.data.resize(m.size); // truncate off the padding bytes
_last_message_received_time = fc::time_point::now();
try
{
// message handling errors are warnings...
_delegate->on_message(_self, m);
}
/// Dedicated catches needed to distinguish from general fc::exception
catch ( const fc::canceled_exception& e ) { throw e; }
catch ( const fc::eof_exception& e ) { throw e; }
catch ( const fc::exception& e)
{
/// Here loop should be continued so exception should be just caught locally.
wlog( "message transmission failed ${er}", ("er", e.to_detail_string() ) );
throw;
}
}
}
catch ( const fc::canceled_exception& e )
{
wlog( "caught a canceled_exception in read_loop. this should mean we're in the process of deleting this object already, so there's no need to notify the delegate: ${e}", ("e", e.to_detail_string() ) );
throw;
}
catch ( const fc::eof_exception& e )
{
wlog( "disconnected ${e}", ("e", e.to_detail_string() ) );
call_on_connection_closed = true;
}
catch ( const fc::exception& e )
{
elog( "disconnected ${er}", ("er", e.to_detail_string() ) );
call_on_connection_closed = true;
exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.to_detail_string())));
}
catch ( const std::exception& e )
{
elog( "disconnected ${er}", ("er", e.what() ) );
call_on_connection_closed = true;
exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.what())));
}
catch ( ... )
{
elog( "unexpected exception" );
call_on_connection_closed = true;
exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", fc::except_str())));
}
if (call_on_connection_closed)
_delegate->on_connection_closed(_self);
if (exception_to_rethrow)
throw *exception_to_rethrow;
}
void message_oriented_connection_impl::send_message(const message& message_to_send)
{
VERIFY_CORRECT_THREAD();
#if 0 // this gets too verbose
#ifndef NDEBUG
fc::optional<fc::ip::endpoint> remote_endpoint;
if (_sock.get_socket().is_open())
remote_endpoint = _sock.get_socket().remote_endpoint();
struct scope_logger {
const fc::optional<fc::ip::endpoint>& endpoint;
scope_logger(const fc::optional<fc::ip::endpoint>& endpoint) : endpoint(endpoint) { dlog("entering message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); }
~scope_logger() { dlog("leaving message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); }
} send_message_scope_logger(remote_endpoint);
#endif
#endif
struct verify_no_send_in_progress {
bool& var;
verify_no_send_in_progress(bool& var) : var(var)
{
if (var)
elog("Error: two tasks are calling message_oriented_connection::send_message() at the same time");
assert(!var);
var = true;
}
~verify_no_send_in_progress() { var = false; }
} _verify_no_send_in_progress(_send_message_in_progress);
try
{
size_t size_of_message_and_header = sizeof(message_header) + message_to_send.size;
if( message_to_send.size > MAX_MESSAGE_SIZE )
elog("Trying to send a message larger than MAX_MESSAGE_SIZE. This probably won't work...");
//pad the message we send to a multiple of 16 bytes
size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
std::unique_ptr<char[]> padded_message(new char[size_with_padding]);
memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header));
memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size );
_sock.write(padded_message.get(), size_with_padding);
_sock.flush();
_bytes_sent += size_with_padding;
_last_message_sent_time = fc::time_point::now();
} FC_RETHROW_EXCEPTIONS( warn, "unable to send message" );
}
void message_oriented_connection_impl::close_connection()
{
VERIFY_CORRECT_THREAD();
_sock.close();
}
void message_oriented_connection_impl::destroy_connection()
{
VERIFY_CORRECT_THREAD();
fc::optional<fc::ip::endpoint> remote_endpoint;
if (_sock.get_socket().is_open())
remote_endpoint = _sock.get_socket().remote_endpoint();
ilog( "in destroy_connection() for ${endpoint}", ("endpoint", remote_endpoint) );
if (_send_message_in_progress)
elog("Error: message_oriented_connection is being destroyed while a send_message is in progress. "
"The task calling send_message() should have been canceled already");
assert(!_send_message_in_progress);
try
{
_read_loop_done.cancel_and_wait(__FUNCTION__);
}
catch ( const fc::exception& e )
{
wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring: ${e}", ("e",e) );
}
catch (...)
{
wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring" );
}
}
uint64_t message_oriented_connection_impl::get_total_bytes_sent() const
{
VERIFY_CORRECT_THREAD();
return _bytes_sent;
}
uint64_t message_oriented_connection_impl::get_total_bytes_received() const
{
VERIFY_CORRECT_THREAD();
return _bytes_received;
}
fc::time_point message_oriented_connection_impl::get_last_message_sent_time() const
{
VERIFY_CORRECT_THREAD();
return _last_message_sent_time;
}
fc::time_point message_oriented_connection_impl::get_last_message_received_time() const
{
VERIFY_CORRECT_THREAD();
return _last_message_received_time;
}
fc::sha512 message_oriented_connection_impl::get_shared_secret() const
{
VERIFY_CORRECT_THREAD();
return _sock.get_shared_secret();
}
} // end namespace graphene::p2p::detail
message_oriented_connection::message_oriented_connection(message_oriented_connection_delegate* delegate) :
my(new detail::message_oriented_connection_impl(this, delegate))
{
}
message_oriented_connection::~message_oriented_connection()
{
}
fc::tcp_socket& message_oriented_connection::get_socket()
{
return my->get_socket();
}
void message_oriented_connection::accept()
{
my->accept();
}
void message_oriented_connection::connect_to(const fc::ip::endpoint& remote_endpoint)
{
my->connect_to(remote_endpoint);
}
void message_oriented_connection::bind(const fc::ip::endpoint& local_endpoint)
{
my->bind(local_endpoint);
}
void message_oriented_connection::send_message(const message& message_to_send)
{
my->send_message(message_to_send);
}
void message_oriented_connection::close_connection()
{
my->close_connection();
}
void message_oriented_connection::destroy_connection()
{
my->destroy_connection();
}
uint64_t message_oriented_connection::get_total_bytes_sent() const
{
return my->get_total_bytes_sent();
}
uint64_t message_oriented_connection::get_total_bytes_received() const
{
return my->get_total_bytes_received();
}
fc::time_point message_oriented_connection::get_last_message_sent_time() const
{
return my->get_last_message_sent_time();
}
fc::time_point message_oriented_connection::get_last_message_received_time() const
{
return my->get_last_message_received_time();
}
fc::time_point message_oriented_connection::get_connection_time() const
{
return my->get_connection_time();
}
fc::sha512 message_oriented_connection::get_shared_secret() const
{
return my->get_shared_secret();
}
} } // end namespace graphene::p2p

141
libraries/p2p/node.cpp Normal file
View file

@ -0,0 +1,141 @@
#include <graphene/p2p/node.hpp>
namespace graphene { namespace p2p {
node::node( chain_database& db )
:_db(db)
{
}
node::~node()
{
}
void node::add_peer( const fc::ip::endpoint& ep )
{
}
void node::configure( const node_config& cfg )
{
listen_on_endpoint( cfg.server_endpoint, wait_if_not_available );
/** don't allow node to go out of scope until accept loop exits */
auto self = shared_from_this();
_accept_loop_complete = fc::async( [self](){ self->accept_loop(); } )
}
void node::accept_loop()
{
auto self = shared_from_this();
while( !_accept_loop_complete.canceled() )
{
try {
auto new_peer = std::make_shared<peer_connection>(self);
_tcp_server.accept( new_peer.get_socket() );
if( _accept_loop_complete.canceled() )
return;
_peers.insert( new_peer );
// limit the rate at which we accept connections to mitigate DOS attacks
fc::usleep( fc::milliseconds(10) );
} FC_CAPTURE_AND_RETHROW()
}
} // accept_loop()
void node::listen_on_endpoint( fc::ip::endpoint ep, bool wait_if_not_available )
{
if( ep.port() != 0 )
{
// if the user specified a port, we only want to bind to it if it's not already
// being used by another application. During normal operation, we set the
// SO_REUSEADDR/SO_REUSEPORT flags so that we can bind outbound sockets to the
// same local endpoint as we're listening on here. On some platforms, setting
// those flags will prevent us from detecting that other applications are
// listening on that port. We'd like to detect that, so we'll set up a temporary
// tcp server without that flag to see if we can listen on that port.
bool first = true;
for( ;; )
{
bool listen_failed = false;
try
{
fc::tcp_server temporary_server;
if( listen_endpoint.get_address() != fc::ip::address() )
temporary_server.listen( ep );
else
temporary_server.listen( ep.port() );
break;
}
catch ( const fc::exception&)
{
listen_failed = true;
}
if (listen_failed)
{
if( wait_if_endpoint_is_busy )
{
std::ostringstream error_message_stream;
if( first )
{
error_message_stream << "Unable to listen for connections on port "
<< ep.port()
<< ", retrying in a few seconds\n";
error_message_stream << "You can wait for it to become available, or restart "
"this program using\n";
error_message_stream << "the --p2p-port option to specify another port\n";
first = false;
}
else
{
error_message_stream << "\nStill waiting for port " << listen_endpoint.port() << " to become available\n";
}
std::string error_message = error_message_stream.str();
ulog(error_message);
fc::usleep( fc::seconds(5 ) );
}
else // don't wait, just find a random port
{
wlog( "unable to bind on the requested endpoint ${endpoint}, "
"which probably means that endpoint is already in use",
( "endpoint", ep ) );
ep.set_port( 0 );
}
} // if (listen_failed)
} // for(;;)
} // if (listen_endpoint.port() != 0)
_tcp_server.set_reuse_address();
try
{
if( ep.get_address() != fc::ip::address() )
_tcp_server.listen( ep );
else
_tcp_server.listen( ep.port() );
_actual_listening_endpoint = _tcp_server.get_local_endpoint();
ilog( "listening for connections on endpoint ${endpoint} (our first choice)",
( "endpoint", _actual_listening_endpoint ) );
}
catch ( fc::exception& e )
{
FC_RETHROW_EXCEPTION( e, error,
"unable to listen on ${endpoint}", ("endpoint",listen_endpoint ) );
}
}
} }

View file

@ -0,0 +1,7 @@
#include <graphene/p2p/peer_connection.hpp>
namespace graphene { namespace p2p {
} } //graphene::p2p

View file

@ -0,0 +1,181 @@
/*
* Copyright (c) 2015, Cryptonomex, Inc.
* All rights reserved.
*
* This source code is provided for evaluation in private test networks only, until September 8, 2015. After this date, this license expires and
* the code may not be used, modified or distributed for any purpose. Redistribution and use in source and binary forms, with or without modification,
* are permitted until September 8, 2015, provided that the following conditions are met:
*
* 1. The code and/or derivative works are used only for private test networks consisting of no more than 10 P2P nodes.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <assert.h>
#include <algorithm>
#include <fc/crypto/hex.hpp>
#include <fc/crypto/aes.hpp>
#include <fc/crypto/city.hpp>
#include <fc/log/logger.hpp>
#include <fc/network/ip.hpp>
#include <fc/exception/exception.hpp>
#include <graphene/p2p/stcp_socket.hpp>
namespace graphene { namespace p2p {
stcp_socket::stcp_socket()
//:_buf_len(0)
#ifndef NDEBUG
: _read_buffer_in_use(false),
_write_buffer_in_use(false)
#endif
{
}
stcp_socket::~stcp_socket()
{
}
void stcp_socket::do_key_exchange()
{
_priv_key = fc::ecc::private_key::generate();
fc::ecc::public_key pub = _priv_key.get_public_key();
fc::ecc::public_key_data s = pub.serialize();
std::shared_ptr<char> serialized_key_buffer(new char[sizeof(fc::ecc::public_key_data)], [](char* p){ delete[] p; });
memcpy(serialized_key_buffer.get(), (char*)&s, sizeof(fc::ecc::public_key_data));
_sock.write( serialized_key_buffer, sizeof(fc::ecc::public_key_data) );
_sock.read( serialized_key_buffer, sizeof(fc::ecc::public_key_data) );
fc::ecc::public_key_data rpub;
memcpy((char*)&rpub, serialized_key_buffer.get(), sizeof(fc::ecc::public_key_data));
_shared_secret = _priv_key.get_shared_secret( rpub );
// ilog("shared secret ${s}", ("s", shared_secret) );
_send_aes.init( fc::sha256::hash( (char*)&_shared_secret, sizeof(_shared_secret) ),
fc::city_hash_crc_128((char*)&_shared_secret,sizeof(_shared_secret) ) );
_recv_aes.init( fc::sha256::hash( (char*)&_shared_secret, sizeof(_shared_secret) ),
fc::city_hash_crc_128((char*)&_shared_secret,sizeof(_shared_secret) ) );
}
void stcp_socket::connect_to( const fc::ip::endpoint& remote_endpoint )
{
_sock.connect_to( remote_endpoint );
do_key_exchange();
}
void stcp_socket::bind( const fc::ip::endpoint& local_endpoint )
{
_sock.bind(local_endpoint);
}
/**
* This method must read at least 16 bytes at a time from
* the underlying TCP socket so that it can decrypt them. It
* will buffer any left-over.
*/
size_t stcp_socket::readsome( char* buffer, size_t len )
{ try {
assert( len > 0 && (len % 16) == 0 );
#ifndef NDEBUG
// This code was written with the assumption that you'd only be making one call to readsome
// at a time so it reuses _read_buffer. If you really need to make concurrent calls to
// readsome(), you'll need to prevent reusing _read_buffer here
struct check_buffer_in_use {
bool& _buffer_in_use;
check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; }
~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; }
} buffer_in_use_checker(_read_buffer_in_use);
#endif
const size_t read_buffer_length = 4096;
if (!_read_buffer)
_read_buffer.reset(new char[read_buffer_length], [](char* p){ delete[] p; });
len = std::min<size_t>(read_buffer_length, len);
size_t s = _sock.readsome( _read_buffer, len, 0 );
if( s % 16 )
{
_sock.read(_read_buffer, 16 - (s%16), s);
s += 16-(s%16);
}
_recv_aes.decode( _read_buffer.get(), s, buffer );
return s;
} FC_RETHROW_EXCEPTIONS( warn, "", ("len",len) ) }
size_t stcp_socket::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
{
return readsome(buf.get() + offset, len);
}
bool stcp_socket::eof()const
{
return _sock.eof();
}
size_t stcp_socket::writesome( const char* buffer, size_t len )
{ try {
assert( len > 0 && (len % 16) == 0 );
#ifndef NDEBUG
// This code was written with the assumption that you'd only be making one call to writesome
// at a time so it reuses _write_buffer. If you really need to make concurrent calls to
// writesome(), you'll need to prevent reusing _write_buffer here
struct check_buffer_in_use {
bool& _buffer_in_use;
check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; }
~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; }
} buffer_in_use_checker(_write_buffer_in_use);
#endif
const std::size_t write_buffer_length = 4096;
if (!_write_buffer)
_write_buffer.reset(new char[write_buffer_length], [](char* p){ delete[] p; });
len = std::min<size_t>(write_buffer_length, len);
memset(_write_buffer.get(), 0, len); // just in case aes.encode screws up
/**
* every sizeof(crypt_buf) bytes the aes channel
* has an error and doesn't decrypt properly... disable
* for now because we are going to upgrade to something
* better.
*/
uint32_t ciphertext_len = _send_aes.encode( buffer, len, _write_buffer.get() );
assert(ciphertext_len == len);
_sock.write( _write_buffer, ciphertext_len );
return ciphertext_len;
} FC_RETHROW_EXCEPTIONS( warn, "", ("len",len) ) }
size_t stcp_socket::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
{
return writesome(buf.get() + offset, len);
}
void stcp_socket::flush()
{
_sock.flush();
}
void stcp_socket::close()
{
try
{
_sock.close();
}FC_RETHROW_EXCEPTIONS( warn, "error closing stcp socket" );
}
void stcp_socket::accept()
{
do_key_exchange();
}
}} // namespace graphene::p2p