diff --git a/libraries/p2p/CMakeLists.txt b/libraries/p2p/CMakeLists.txt new file mode 100644 index 00000000..6b5918d5 --- /dev/null +++ b/libraries/p2p/CMakeLists.txt @@ -0,0 +1,32 @@ +file(GLOB HEADERS "include/graphene/p2p/*.hpp") + +set(SOURCES node.cpp + stcp_socket.cpp + peer_connection.cpp + message_oriented_connection.cpp) + +add_library( graphene_p2p ${SOURCES} ${HEADERS} ) + +target_link_libraries( graphene_p2p + PUBLIC fc graphene_db ) +target_include_directories( graphene_p2p + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" + PRIVATE "${CMAKE_SOURCE_DIR}/libraries/chain/include" +) + +#if(MSVC) +# set_source_files_properties( node.cpp PROPERTIES COMPILE_FLAGS "/bigobj" ) +#endif(MSVC) + +#if (USE_PCH) +# set_target_properties(graphene_p2p PROPERTIES COTIRE_ADD_UNITY_BUILD FALSE) +# cotire(graphene_p2p ) +#endif(USE_PCH) + +install( TARGETS + graphene_p2p + + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib +) diff --git a/libraries/p2p/design.md b/libraries/p2p/design.md new file mode 100644 index 00000000..d55c1411 --- /dev/null +++ b/libraries/p2p/design.md @@ -0,0 +1,90 @@ +# Network Protocol 2 + +Building a low-latency network requires P2P nodes that have low-latency +connections and a protocol designed to minimize latency. for the purpose +of this document we will assume that two nodes are located on opposite +sides of the globe with a ping time of 250ms. + + +## Announce, Request, Send Protocol +Under the prior network archtiecture, transactions and blocks were broadcast +in a manner similar to the Bitcoin protocol: inventory messages notify peers of +transactions and blocks, then peers fetch the transaction or block from one +peer. After validating the item a node will broadcast an inventory message to +its peers. + +Under this model it will take 0.75 seconds for a peer to communicate a transaction +or block to another peer even if their size was 0 and there was no processing overhead. +This level of performance is unacceptable for a network attempting to produce one block +every second. + +This prior protocol also sent every transaction twice: initial broadcast, and again as +part of a block. + + +## Push Protocol +To minimize latency each node needs to immediately broadcast the data it receives +to its peers after validating it. Given the average transaction size is less than +100 bytes, it is almost as effecient to send the transaction as it is to send +the notice (assuming a 20 byte transaction id) + +Each node implements the following protocol: + + + onReceiveTransaction( from_peer, transaction ) + if( isKnown( transaction.id() ) ) + return + + markKnown( transaction.id() ) + + if( !validate( transaction ) ) + return + + for( peer : peers ) + if( peer != from_peer ) + send( peer, transaction ) + + + onReceiveBlock( from_peer, block_summary ) + if( isKnown( block_summary ) + return + + full_block = reconstructFullBlcok( from_peer, block_summary ) + if( !full_block ) disconnect from_peer + + markKnown( block_summary ) + + if( !pushBlock( full_block ) ) disconnect from_peer + + for( peer : peers ) + if( peer != from_peer ) + send( peer, block_summary ) + + + onConnect( new_peer, new_peer_head_block_num ) + if( peers.size() >= max_peers ) + send( new_peer, peers ) + disconnect( new_peer ) + return + + while( new_peer_head_block_num < our_head_block_num ) + sendFullBlock( new_peer, ++new_peer_head_block_num ) + + new_peer.synced = true + for( peer : peers ) + send( peer, new_peer ) + + onReceivePeers( from_peer, peers ) + addToPotentialPeers( peers ) + + onUpdateConnectionsTimer + if( peers.size() < desired_peers ) + connect( random_potential_peer ) + + onFullBlock( from_peer, full_block ) + if( !pushBlock( full_block ) ) disconnect from_peer + + onStartup + init_potential_peers from config + start onUpdateConnectionsTimer + diff --git a/libraries/p2p/include/graphene/p2p/message.hpp b/libraries/p2p/include/graphene/p2p/message.hpp new file mode 100644 index 00000000..926180d1 --- /dev/null +++ b/libraries/p2p/include/graphene/p2p/message.hpp @@ -0,0 +1,151 @@ +/** Copyright (c) 2015, Cryptonomex, Inc. All rights reserved. */ +#pragma once +#include +#include +#include +#include +#include +#include + +namespace graphene { namespace p2p { + + struct message_header + { + uint32_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE + uint32_t msg_type; + }; + + typedef fc::uint160_t message_hash_type; + + /** + * Abstracts the process of packing/unpacking a message for a + * particular channel. + */ + struct message : public message_header + { + std::vector data; + + message(){} + + message( message&& m ) + :message_header(m),data( std::move(m.data) ){} + + message( const message& m ) + :message_header(m),data( m.data ){} + + /** + * Assumes that T::type specifies the message type + */ + template + message( const T& m ) + { + msg_type = T::type; + data = fc::raw::pack(m); + size = (uint32_t)data.size(); + } + + fc::uint160_t id()const + { + return fc::ripemd160::hash( data.data(), (uint32_t)data.size() ); + } + + /** + * Automatically checks the type and deserializes T in the + * opposite process from the constructor. + */ + template + T as()const + { + try { + FC_ASSERT( msg_type == T::type ); + T tmp; + if( data.size() ) + { + fc::datastream ds( data.data(), data.size() ); + fc::raw::unpack( ds, tmp ); + } + else + { + // just to make sure that tmp shouldn't have any data + fc::datastream ds( nullptr, 0 ); + fc::raw::unpack( ds, tmp ); + } + return tmp; + } FC_RETHROW_EXCEPTIONS( warn, + "error unpacking network message as a '${type}' ${x} !=? ${msg_type}", + ("type", fc::get_typename::name() ) + ("x", T::type) + ("msg_type", msg_type) + ); + } + }; + + enum core_message_type_enum { + hello_message_type = 1000, + transaction_message_type = 1001, + block_message_type = 1002, + peer_message_type = 1003, + error_message_type = 1004 + }; + + struct hello_message + { + static const core_message_type_enum type; + + std::string user_agent; + uint16_t version; + + fc::ip::address inbound_address; + uint16_t inbound_port; + uint16_t outbound_port; + node_id_t node_public_key; + fc::sha256 chain_id; + fc::variant_object user_data; + block_id_type head_block; + }; + + struct transaction_message + { + static const core_message_type_enum type; + signed_transaction trx; + }; + + struct block_summary_message + { + static const core_message_type_enum type; + + signed_block_header header; + vector transaction_ids; + }; + + struct full_block_message + { + static const core_message_type_enum type; + signed_block block; + }; + + struct peers_message + { + static const core_message_type_enum type; + + vector peers; + }; + + struct error_message + { + static const core_message_type_enum type; + string message; + }; + + +} } // graphene::p2p + +FC_REFLECT( graphene::p2p::message_header, (size)(msg_type) ) +FC_REFLECT_DERIVED( graphene::p2p::message, (graphene::p2p::message_header), (data) ) +FC_REFLECT_ENUM( graphene::p2p::core_message_type_enum, + (hello_message_type) + (transaction_message_type) + (block_message_type) + (peer_message_type) + (error_message_type) +) diff --git a/libraries/p2p/include/graphene/p2p/message_oriented_connection.hpp b/libraries/p2p/include/graphene/p2p/message_oriented_connection.hpp new file mode 100644 index 00000000..82b73195 --- /dev/null +++ b/libraries/p2p/include/graphene/p2p/message_oriented_connection.hpp @@ -0,0 +1,49 @@ +/** Copyright (c) 2015, Cryptonomex, Inc. All rights reserved. */ +#pragma once +#include +#include + +namespace graphene { namespace p2p { + + namespace detail { class message_oriented_connection_impl; } + + class message_oriented_connection; + + /** receives incoming messages from a message_oriented_connection object */ + class message_oriented_connection_delegate + { + public: + virtual void on_message( message_oriented_connection* originating_connection, + const message& received_message) = 0; + + virtual void on_connection_closed(message_oriented_connection* originating_connection) = 0; + }; + + /** uses a secure socket to create a connection that reads and writes a stream of `fc::p2p::message` objects */ + class message_oriented_connection + { + public: + message_oriented_connection(message_oriented_connection_delegate* delegate = nullptr); + ~message_oriented_connection(); + fc::tcp_socket& get_socket(); + + void accept(); + void bind(const fc::ip::endpoint& local_endpoint); + void connect_to(const fc::ip::endpoint& remote_endpoint); + + void send_message(const message& message_to_send); + void close_connection(); + void destroy_connection(); + + uint64_t get_total_bytes_sent() const; + uint64_t get_total_bytes_received() const; + fc::time_point get_last_message_sent_time() const; + fc::time_point get_last_message_received_time() const; + fc::time_point get_connection_time() const; + fc::sha512 get_shared_secret() const; + private: + std::unique_ptr my; + }; + typedef std::shared_ptr message_oriented_connection_ptr; + +} } // graphene::net diff --git a/libraries/p2p/include/graphene/p2p/node.hpp b/libraries/p2p/include/graphene/p2p/node.hpp new file mode 100644 index 00000000..aa7f5e46 --- /dev/null +++ b/libraries/p2p/include/graphene/p2p/node.hpp @@ -0,0 +1,74 @@ +/** Copyright (c) 2015, Cryptonomex, Inc. */ + +#pragma once +#include +#include + + + +namespace graphene { namespace p2p { + using namespace graphene::chain; + + struct node_config + { + fc::ip::endpoint server_endpoint; + bool wait_if_not_available = true; + uint32_t desired_peers; + uint32_t max_peers; + /** receive, but don't rebroadcast data */ + bool subscribe_only = false; + public_key_type node_id; + vector seed_nodes; + }; + + struct by_remote_endpoint; + struct by_peer_id; + + /** + * @ingroup object_index + */ + typedef multi_index_container< + peer_connection_ptr, + indexed_by< + ordered_unique< tag, + const_mem_fun< peer_connection, fc::ip::endpoint, &peer_connection::get_remote_endpoint > >, + ordered_unique< tag, member< peer_connection, public_key_type, &peer_connection::node_id > > + > + > peer_connection_index; + + + class node : public std::enable_shared_from_this + { + public: + server( chain_database& db ); + + void add_peer( const fc::ip::endpoint& ep ); + void configure( const node_config& cfg ); + + void on_incomming_connection( peer_connection_ptr new_peer ); + void on_hello( peer_connection_ptr new_peer, hello_message m ); + void on_transaction( peer_connection_ptr from_peer, transaction_message m ); + void on_block( peer_connection_ptr from_peer, block_message m ); + void on_peers( peer_connection_ptr from_peer, peers_message m ); + void on_error( peer_connection_ptr from_peer, error_message m ); + void on_full_block( peer_connection_ptr from_peer, full_block_message m ); + void on_update_connections(); + + private: + /** + * Specifies the network interface and port upon which incoming + * connections should be accepted. + */ + void listen_on_endpoint( fc::ip::endpoint ep, bool wait_if_not_available ); + void accept_loop(); + + graphene::chain::database& _db; + + fc::tcp_server _tcp_server; + fc::ip::endpoint _actual_listening_endpoint; + fc::future _accept_loop_complete; + peer_connection_index _peers; + + }; + +} } /// graphene::p2p diff --git a/libraries/p2p/include/graphene/p2p/peer_connection.hpp b/libraries/p2p/include/graphene/p2p/peer_connection.hpp new file mode 100644 index 00000000..8f0ab594 --- /dev/null +++ b/libraries/p2p/include/graphene/p2p/peer_connection.hpp @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2015, Cryptonomex, Inc. + * All rights reserved. + * + * This source code is provided for evaluation in private test networks only, until September 8, 2015. After this date, this license expires and + * the code may not be used, modified or distributed for any purpose. Redistribution and use in source and binary forms, with or without modification, + * are permitted until September 8, 2015, provided that the following conditions are met: + * + * 1. The code and/or derivative works are used only for private test networks consisting of no more than 10 P2P nodes. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#pragma once + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace graphene { namespace p2p { + + class peer_connection; + class peer_connection_delegate + { + public: + virtual void on_message(peer_connection* originating_peer, + const message& received_message) = 0; + virtual void on_connection_closed(peer_connection* originating_peer) = 0; + virtual message get_message_for_item(const item_id& item) = 0; + }; + + class peer_connection; + typedef std::shared_ptr peer_connection_ptr; + + + class peer_connection : public message_oriented_connection_delegate, + public std::enable_shared_from_this + { + public: + enum direction_type { inbound, outbound }; + enum connection_state { + connecting = 0, + syncing = 1, + synced = 2 + }; + + fc::time_point connection_initiation_time; + fc::time_point connection_closed_time; + fc::time_point connection_terminated_time; + direction_type direction = outbound; + connection_state state = connecting; + bool is_firewalled = true + + //connection_state state; + fc::microseconds clock_offset; + fc::microseconds round_trip_delay; + + /// data about the peer node + /// @{ + + /** the unique identifier we'll use to refer to the node with. zero-initialized before + * we receive the hello message, at which time it will be filled with either the "node_id" + * from the user_data field of the hello, or if none is present it will be filled with a + * copy of node_public_key */ + public_key_type node_id; + uint32_t core_protocol_version; + std::string user_agent; + + fc::optional graphene_git_revision_sha; + fc::optional graphene_git_revision_unix_timestamp; + fc::optional fc_git_revision_sha; + fc::optional fc_git_revision_unix_timestamp; + fc::optional platform; + fc::optional bitness; + + // for inbound connections, these fields record what the peer sent us in + // its hello message. For outbound, they record what we sent the peer + // in our hello message + fc::ip::address inbound_address; + uint16_t inbound_port; + uint16_t outbound_port; + /// @} + + void send( transaction_message_ptr msg ) + { + // if not in sent_or_received then insert into _pending_send + // if process_send_queue is invalid or complete then + // async process_send_queue + } + + void received_transaction( const transaction_id_type& id ) + { + _sent_or_received.insert(id); + } + + void process_send_queue() + { + // while _pending_send.size() || _pending_blocks.size() + // while there are pending blocks, then take the oldest + // for each transaction id, verify that it exists in _sent_or_received + // else find it in the _pending_send queue and send it + // send one from _pending_send + } + + + std::unordered_map _pending_send; + /// todo: make multi-index that tracks how long items have been cached and removes them + /// after a resasonable period of time (say 10 seconds) + std::unordered_set _sent_or_received; + std::map _pending_blocks; + + + fc::ip::endpoint get_remote_endpoint()const + { return get_socket().get_remote_endpoint(); } + + void on_message(message_oriented_connection* originating_connection, + const message& received_message) override + { + switch( core_message_type_enum( received_message.type ) ) + { + case hello_message_type: + _node->on_hello( shared_from_this(), + received_message.as() ); + break; + case transaction_message_type: + _node->on_transaction( shared_from_this(), + received_message.as() ); + break; + case block_message_type: + _node->on_block( shared_from_this(), + received_message.as() ); + break; + case peer_message_type: + _node->on_peers( shared_from_this(), + received_message.as() ); + break; + } + } + + void on_connection_closed(message_oriented_connection* originating_connection) override + { + _node->on_close( shared_from_this() ); + } + + fc::tcp_socket& get_socket() { return _message_connection.get_socket(); } + + private: + peer_connection_delegate* _node; + fc::optional _remote_endpoint; + message_oriented_connection _message_connection; + + }; + typedef std::shared_ptr peer_connection_ptr; + + + } } // end namespace graphene::p2p + +// not sent over the wire, just reflected for logging +FC_REFLECT_ENUM(graphene::p2p::peer_connection::connection_state, (connecting)(syncing)(synced) ) +FC_REFLECT_ENUM(graphene::p2p::peer_connection::direction_type, (inbound)(outbound) ) diff --git a/libraries/p2p/include/graphene/p2p/stcp_socket.hpp b/libraries/p2p/include/graphene/p2p/stcp_socket.hpp new file mode 100644 index 00000000..01e6df34 --- /dev/null +++ b/libraries/p2p/include/graphene/p2p/stcp_socket.hpp @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2015, Cryptonomex, Inc. All rights reserved. + */ +#pragma once +#include +#include +#include + +namespace graphene { namespace p2p { + +/** + * Uses ECDH to negotiate a aes key for communicating + * with other nodes on the network. + */ +class stcp_socket : public virtual fc::iostream +{ + public: + stcp_socket(); + ~stcp_socket(); + fc::tcp_socket& get_socket() { return _sock; } + void accept(); + + void connect_to( const fc::ip::endpoint& remote_endpoint ); + void bind( const fc::ip::endpoint& local_endpoint ); + + virtual size_t readsome( char* buffer, size_t max ); + virtual size_t readsome( const std::shared_ptr& buf, size_t len, size_t offset ); + virtual bool eof()const; + + virtual size_t writesome( const char* buffer, size_t len ); + virtual size_t writesome( const std::shared_ptr& buf, size_t len, size_t offset ); + + virtual void flush(); + virtual void close(); + + using istream::get; + void get( char& c ) { read( &c, 1 ); } + fc::sha512 get_shared_secret() const { return _shared_secret; } + private: + void do_key_exchange(); + + fc::sha512 _shared_secret; + fc::ecc::private_key _priv_key; + fc::array _buf; + //uint32_t _buf_len; + fc::tcp_socket _sock; + fc::aes_encoder _send_aes; + fc::aes_decoder _recv_aes; + std::shared_ptr _read_buffer; + std::shared_ptr _write_buffer; +#ifndef NDEBUG + bool _read_buffer_in_use; + bool _write_buffer_in_use; +#endif +}; + +typedef std::shared_ptr stcp_socket_ptr; + +} } // graphene::p2p diff --git a/libraries/p2p/message_oriented_connection.cpp b/libraries/p2p/message_oriented_connection.cpp new file mode 100644 index 00000000..a17ec541 --- /dev/null +++ b/libraries/p2p/message_oriented_connection.cpp @@ -0,0 +1,393 @@ +/* + * Copyright (c) 2015, Cryptonomex, Inc. + * All rights reserved. + */ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#ifdef DEFAULT_LOGGER +# undef DEFAULT_LOGGER +#endif +#define DEFAULT_LOGGER "p2p" + +#ifndef NDEBUG +# define VERIFY_CORRECT_THREAD() assert(_thread->is_current()) +#else +# define VERIFY_CORRECT_THREAD() do {} while (0) +#endif + +namespace graphene { namespace p2p { + namespace detail + { + class message_oriented_connection_impl + { + private: + message_oriented_connection* _self; + message_oriented_connection_delegate *_delegate; + stcp_socket _sock; + fc::future _read_loop_done; + uint64_t _bytes_received; + uint64_t _bytes_sent; + + fc::time_point _connected_time; + fc::time_point _last_message_received_time; + fc::time_point _last_message_sent_time; + + bool _send_message_in_progress; + +#ifndef NDEBUG + fc::thread* _thread; +#endif + + void read_loop(); + void start_read_loop(); + public: + fc::tcp_socket& get_socket(); + void accept(); + void connect_to(const fc::ip::endpoint& remote_endpoint); + void bind(const fc::ip::endpoint& local_endpoint); + + message_oriented_connection_impl(message_oriented_connection* self, + message_oriented_connection_delegate* delegate = nullptr); + ~message_oriented_connection_impl(); + + void send_message(const message& message_to_send); + void close_connection(); + void destroy_connection(); + + uint64_t get_total_bytes_sent() const; + uint64_t get_total_bytes_received() const; + + fc::time_point get_last_message_sent_time() const; + fc::time_point get_last_message_received_time() const; + fc::time_point get_connection_time() const { return _connected_time; } + fc::sha512 get_shared_secret() const; + }; + + message_oriented_connection_impl::message_oriented_connection_impl(message_oriented_connection* self, + message_oriented_connection_delegate* delegate) + : _self(self), + _delegate(delegate), + _bytes_received(0), + _bytes_sent(0), + _send_message_in_progress(false) +#ifndef NDEBUG + ,_thread(&fc::thread::current()) +#endif + { + } + message_oriented_connection_impl::~message_oriented_connection_impl() + { + VERIFY_CORRECT_THREAD(); + destroy_connection(); + } + + fc::tcp_socket& message_oriented_connection_impl::get_socket() + { + VERIFY_CORRECT_THREAD(); + return _sock.get_socket(); + } + + void message_oriented_connection_impl::accept() + { + VERIFY_CORRECT_THREAD(); + _sock.accept(); + assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops + _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop"); + } + + void message_oriented_connection_impl::connect_to(const fc::ip::endpoint& remote_endpoint) + { + VERIFY_CORRECT_THREAD(); + _sock.connect_to(remote_endpoint); + assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops + _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop"); + } + + void message_oriented_connection_impl::bind(const fc::ip::endpoint& local_endpoint) + { + VERIFY_CORRECT_THREAD(); + _sock.bind(local_endpoint); + } + + + void message_oriented_connection_impl::read_loop() + { + VERIFY_CORRECT_THREAD(); + const int BUFFER_SIZE = 16; + const int LEFTOVER = BUFFER_SIZE - sizeof(message_header); + static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer"); + + _connected_time = fc::time_point::now(); + + fc::oexception exception_to_rethrow; + bool call_on_connection_closed = false; + + try + { + message m; + while( true ) + { + char buffer[BUFFER_SIZE]; + _sock.read(buffer, BUFFER_SIZE); + _bytes_received += BUFFER_SIZE; + memcpy((char*)&m, buffer, sizeof(message_header)); + + FC_ASSERT( m.size <= MAX_MESSAGE_SIZE, "", ("m.size",m.size)("MAX_MESSAGE_SIZE",MAX_MESSAGE_SIZE) ); + + size_t remaining_bytes_with_padding = 16 * ((m.size - LEFTOVER + 15) / 16); + m.data.resize(LEFTOVER + remaining_bytes_with_padding); //give extra 16 bytes to allow for padding added in send call + std::copy(buffer + sizeof(message_header), buffer + sizeof(buffer), m.data.begin()); + if (remaining_bytes_with_padding) + { + _sock.read(&m.data[LEFTOVER], remaining_bytes_with_padding); + _bytes_received += remaining_bytes_with_padding; + } + m.data.resize(m.size); // truncate off the padding bytes + + _last_message_received_time = fc::time_point::now(); + + try + { + // message handling errors are warnings... + _delegate->on_message(_self, m); + } + /// Dedicated catches needed to distinguish from general fc::exception + catch ( const fc::canceled_exception& e ) { throw e; } + catch ( const fc::eof_exception& e ) { throw e; } + catch ( const fc::exception& e) + { + /// Here loop should be continued so exception should be just caught locally. + wlog( "message transmission failed ${er}", ("er", e.to_detail_string() ) ); + throw; + } + } + } + catch ( const fc::canceled_exception& e ) + { + wlog( "caught a canceled_exception in read_loop. this should mean we're in the process of deleting this object already, so there's no need to notify the delegate: ${e}", ("e", e.to_detail_string() ) ); + throw; + } + catch ( const fc::eof_exception& e ) + { + wlog( "disconnected ${e}", ("e", e.to_detail_string() ) ); + call_on_connection_closed = true; + } + catch ( const fc::exception& e ) + { + elog( "disconnected ${er}", ("er", e.to_detail_string() ) ); + call_on_connection_closed = true; + exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.to_detail_string()))); + } + catch ( const std::exception& e ) + { + elog( "disconnected ${er}", ("er", e.what() ) ); + call_on_connection_closed = true; + exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.what()))); + } + catch ( ... ) + { + elog( "unexpected exception" ); + call_on_connection_closed = true; + exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", fc::except_str()))); + } + + if (call_on_connection_closed) + _delegate->on_connection_closed(_self); + + if (exception_to_rethrow) + throw *exception_to_rethrow; + } + + void message_oriented_connection_impl::send_message(const message& message_to_send) + { + VERIFY_CORRECT_THREAD(); +#if 0 // this gets too verbose +#ifndef NDEBUG + fc::optional remote_endpoint; + if (_sock.get_socket().is_open()) + remote_endpoint = _sock.get_socket().remote_endpoint(); + struct scope_logger { + const fc::optional& endpoint; + scope_logger(const fc::optional& endpoint) : endpoint(endpoint) { dlog("entering message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); } + ~scope_logger() { dlog("leaving message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); } + } send_message_scope_logger(remote_endpoint); +#endif +#endif + struct verify_no_send_in_progress { + bool& var; + verify_no_send_in_progress(bool& var) : var(var) + { + if (var) + elog("Error: two tasks are calling message_oriented_connection::send_message() at the same time"); + assert(!var); + var = true; + } + ~verify_no_send_in_progress() { var = false; } + } _verify_no_send_in_progress(_send_message_in_progress); + + try + { + size_t size_of_message_and_header = sizeof(message_header) + message_to_send.size; + if( message_to_send.size > MAX_MESSAGE_SIZE ) + elog("Trying to send a message larger than MAX_MESSAGE_SIZE. This probably won't work..."); + //pad the message we send to a multiple of 16 bytes + size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16); + std::unique_ptr padded_message(new char[size_with_padding]); + memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header)); + memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size ); + _sock.write(padded_message.get(), size_with_padding); + _sock.flush(); + _bytes_sent += size_with_padding; + _last_message_sent_time = fc::time_point::now(); + } FC_RETHROW_EXCEPTIONS( warn, "unable to send message" ); + } + + void message_oriented_connection_impl::close_connection() + { + VERIFY_CORRECT_THREAD(); + _sock.close(); + } + + void message_oriented_connection_impl::destroy_connection() + { + VERIFY_CORRECT_THREAD(); + + fc::optional remote_endpoint; + if (_sock.get_socket().is_open()) + remote_endpoint = _sock.get_socket().remote_endpoint(); + ilog( "in destroy_connection() for ${endpoint}", ("endpoint", remote_endpoint) ); + + if (_send_message_in_progress) + elog("Error: message_oriented_connection is being destroyed while a send_message is in progress. " + "The task calling send_message() should have been canceled already"); + assert(!_send_message_in_progress); + + try + { + _read_loop_done.cancel_and_wait(__FUNCTION__); + } + catch ( const fc::exception& e ) + { + wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring: ${e}", ("e",e) ); + } + catch (...) + { + wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring" ); + } + } + + uint64_t message_oriented_connection_impl::get_total_bytes_sent() const + { + VERIFY_CORRECT_THREAD(); + return _bytes_sent; + } + + uint64_t message_oriented_connection_impl::get_total_bytes_received() const + { + VERIFY_CORRECT_THREAD(); + return _bytes_received; + } + + fc::time_point message_oriented_connection_impl::get_last_message_sent_time() const + { + VERIFY_CORRECT_THREAD(); + return _last_message_sent_time; + } + + fc::time_point message_oriented_connection_impl::get_last_message_received_time() const + { + VERIFY_CORRECT_THREAD(); + return _last_message_received_time; + } + + fc::sha512 message_oriented_connection_impl::get_shared_secret() const + { + VERIFY_CORRECT_THREAD(); + return _sock.get_shared_secret(); + } + + } // end namespace graphene::p2p::detail + + + message_oriented_connection::message_oriented_connection(message_oriented_connection_delegate* delegate) : + my(new detail::message_oriented_connection_impl(this, delegate)) + { + } + + message_oriented_connection::~message_oriented_connection() + { + } + + fc::tcp_socket& message_oriented_connection::get_socket() + { + return my->get_socket(); + } + + void message_oriented_connection::accept() + { + my->accept(); + } + + void message_oriented_connection::connect_to(const fc::ip::endpoint& remote_endpoint) + { + my->connect_to(remote_endpoint); + } + + void message_oriented_connection::bind(const fc::ip::endpoint& local_endpoint) + { + my->bind(local_endpoint); + } + + void message_oriented_connection::send_message(const message& message_to_send) + { + my->send_message(message_to_send); + } + + void message_oriented_connection::close_connection() + { + my->close_connection(); + } + + void message_oriented_connection::destroy_connection() + { + my->destroy_connection(); + } + + uint64_t message_oriented_connection::get_total_bytes_sent() const + { + return my->get_total_bytes_sent(); + } + + uint64_t message_oriented_connection::get_total_bytes_received() const + { + return my->get_total_bytes_received(); + } + + fc::time_point message_oriented_connection::get_last_message_sent_time() const + { + return my->get_last_message_sent_time(); + } + + fc::time_point message_oriented_connection::get_last_message_received_time() const + { + return my->get_last_message_received_time(); + } + fc::time_point message_oriented_connection::get_connection_time() const + { + return my->get_connection_time(); + } + fc::sha512 message_oriented_connection::get_shared_secret() const + { + return my->get_shared_secret(); + } + +} } // end namespace graphene::p2p diff --git a/libraries/p2p/node.cpp b/libraries/p2p/node.cpp new file mode 100644 index 00000000..5a8da3a4 --- /dev/null +++ b/libraries/p2p/node.cpp @@ -0,0 +1,141 @@ +#include + +namespace graphene { namespace p2p { + + node::node( chain_database& db ) + :_db(db) + { + + } + + node::~node() + { + + } + + void node::add_peer( const fc::ip::endpoint& ep ) + { + + } + + void node::configure( const node_config& cfg ) + { + listen_on_endpoint( cfg.server_endpoint, wait_if_not_available ); + + /** don't allow node to go out of scope until accept loop exits */ + auto self = shared_from_this(); + _accept_loop_complete = fc::async( [self](){ self->accept_loop(); } ) + } + + void node::accept_loop() + { + auto self = shared_from_this(); + while( !_accept_loop_complete.canceled() ) + { + try { + auto new_peer = std::make_shared(self); + _tcp_server.accept( new_peer.get_socket() ); + + if( _accept_loop_complete.canceled() ) + return; + + _peers.insert( new_peer ); + + + + // limit the rate at which we accept connections to mitigate DOS attacks + fc::usleep( fc::milliseconds(10) ); + } FC_CAPTURE_AND_RETHROW() + } + } // accept_loop() + + + + void node::listen_on_endpoint( fc::ip::endpoint ep, bool wait_if_not_available ) + { + if( ep.port() != 0 ) + { + // if the user specified a port, we only want to bind to it if it's not already + // being used by another application. During normal operation, we set the + // SO_REUSEADDR/SO_REUSEPORT flags so that we can bind outbound sockets to the + // same local endpoint as we're listening on here. On some platforms, setting + // those flags will prevent us from detecting that other applications are + // listening on that port. We'd like to detect that, so we'll set up a temporary + // tcp server without that flag to see if we can listen on that port. + bool first = true; + for( ;; ) + { + bool listen_failed = false; + + try + { + fc::tcp_server temporary_server; + if( listen_endpoint.get_address() != fc::ip::address() ) + temporary_server.listen( ep ); + else + temporary_server.listen( ep.port() ); + break; + } + catch ( const fc::exception&) + { + listen_failed = true; + } + + if (listen_failed) + { + if( wait_if_endpoint_is_busy ) + { + std::ostringstream error_message_stream; + if( first ) + { + error_message_stream << "Unable to listen for connections on port " + << ep.port() + << ", retrying in a few seconds\n"; + error_message_stream << "You can wait for it to become available, or restart " + "this program using\n"; + error_message_stream << "the --p2p-port option to specify another port\n"; + first = false; + } + else + { + error_message_stream << "\nStill waiting for port " << listen_endpoint.port() << " to become available\n"; + } + + std::string error_message = error_message_stream.str(); + ulog(error_message); + fc::usleep( fc::seconds(5 ) ); + } + else // don't wait, just find a random port + { + wlog( "unable to bind on the requested endpoint ${endpoint}, " + "which probably means that endpoint is already in use", + ( "endpoint", ep ) ); + ep.set_port( 0 ); + } + } // if (listen_failed) + } // for(;;) + } // if (listen_endpoint.port() != 0) + + + _tcp_server.set_reuse_address(); + try + { + if( ep.get_address() != fc::ip::address() ) + _tcp_server.listen( ep ); + else + _tcp_server.listen( ep.port() ); + + _actual_listening_endpoint = _tcp_server.get_local_endpoint(); + ilog( "listening for connections on endpoint ${endpoint} (our first choice)", + ( "endpoint", _actual_listening_endpoint ) ); + } + catch ( fc::exception& e ) + { + FC_RETHROW_EXCEPTION( e, error, + "unable to listen on ${endpoint}", ("endpoint",listen_endpoint ) ); + } + } + + + +} } diff --git a/libraries/p2p/peer_connection.cpp b/libraries/p2p/peer_connection.cpp new file mode 100644 index 00000000..605113b1 --- /dev/null +++ b/libraries/p2p/peer_connection.cpp @@ -0,0 +1,7 @@ +#include + +namespace graphene { namespace p2p { + +} } //graphene::p2p + + diff --git a/libraries/p2p/stcp_socket.cpp b/libraries/p2p/stcp_socket.cpp new file mode 100644 index 00000000..7112cc34 --- /dev/null +++ b/libraries/p2p/stcp_socket.cpp @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2015, Cryptonomex, Inc. + * All rights reserved. + * + * This source code is provided for evaluation in private test networks only, until September 8, 2015. After this date, this license expires and + * the code may not be used, modified or distributed for any purpose. Redistribution and use in source and binary forms, with or without modification, + * are permitted until September 8, 2015, provided that the following conditions are met: + * + * 1. The code and/or derivative works are used only for private test networks consisting of no more than 10 P2P nodes. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace graphene { namespace p2p { + +stcp_socket::stcp_socket() +//:_buf_len(0) +#ifndef NDEBUG + : _read_buffer_in_use(false), + _write_buffer_in_use(false) +#endif +{ +} +stcp_socket::~stcp_socket() +{ +} + +void stcp_socket::do_key_exchange() +{ + _priv_key = fc::ecc::private_key::generate(); + fc::ecc::public_key pub = _priv_key.get_public_key(); + fc::ecc::public_key_data s = pub.serialize(); + std::shared_ptr serialized_key_buffer(new char[sizeof(fc::ecc::public_key_data)], [](char* p){ delete[] p; }); + memcpy(serialized_key_buffer.get(), (char*)&s, sizeof(fc::ecc::public_key_data)); + _sock.write( serialized_key_buffer, sizeof(fc::ecc::public_key_data) ); + _sock.read( serialized_key_buffer, sizeof(fc::ecc::public_key_data) ); + fc::ecc::public_key_data rpub; + memcpy((char*)&rpub, serialized_key_buffer.get(), sizeof(fc::ecc::public_key_data)); + + _shared_secret = _priv_key.get_shared_secret( rpub ); +// ilog("shared secret ${s}", ("s", shared_secret) ); + _send_aes.init( fc::sha256::hash( (char*)&_shared_secret, sizeof(_shared_secret) ), + fc::city_hash_crc_128((char*)&_shared_secret,sizeof(_shared_secret) ) ); + _recv_aes.init( fc::sha256::hash( (char*)&_shared_secret, sizeof(_shared_secret) ), + fc::city_hash_crc_128((char*)&_shared_secret,sizeof(_shared_secret) ) ); +} + + +void stcp_socket::connect_to( const fc::ip::endpoint& remote_endpoint ) +{ + _sock.connect_to( remote_endpoint ); + do_key_exchange(); +} + +void stcp_socket::bind( const fc::ip::endpoint& local_endpoint ) +{ + _sock.bind(local_endpoint); +} + +/** + * This method must read at least 16 bytes at a time from + * the underlying TCP socket so that it can decrypt them. It + * will buffer any left-over. + */ +size_t stcp_socket::readsome( char* buffer, size_t len ) +{ try { + assert( len > 0 && (len % 16) == 0 ); + +#ifndef NDEBUG + // This code was written with the assumption that you'd only be making one call to readsome + // at a time so it reuses _read_buffer. If you really need to make concurrent calls to + // readsome(), you'll need to prevent reusing _read_buffer here + struct check_buffer_in_use { + bool& _buffer_in_use; + check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; } + ~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; } + } buffer_in_use_checker(_read_buffer_in_use); +#endif + + const size_t read_buffer_length = 4096; + if (!_read_buffer) + _read_buffer.reset(new char[read_buffer_length], [](char* p){ delete[] p; }); + + len = std::min(read_buffer_length, len); + + size_t s = _sock.readsome( _read_buffer, len, 0 ); + if( s % 16 ) + { + _sock.read(_read_buffer, 16 - (s%16), s); + s += 16-(s%16); + } + _recv_aes.decode( _read_buffer.get(), s, buffer ); + return s; +} FC_RETHROW_EXCEPTIONS( warn, "", ("len",len) ) } + +size_t stcp_socket::readsome( const std::shared_ptr& buf, size_t len, size_t offset ) +{ + return readsome(buf.get() + offset, len); +} + +bool stcp_socket::eof()const +{ + return _sock.eof(); +} + +size_t stcp_socket::writesome( const char* buffer, size_t len ) +{ try { + assert( len > 0 && (len % 16) == 0 ); + +#ifndef NDEBUG + // This code was written with the assumption that you'd only be making one call to writesome + // at a time so it reuses _write_buffer. If you really need to make concurrent calls to + // writesome(), you'll need to prevent reusing _write_buffer here + struct check_buffer_in_use { + bool& _buffer_in_use; + check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; } + ~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; } + } buffer_in_use_checker(_write_buffer_in_use); +#endif + + const std::size_t write_buffer_length = 4096; + if (!_write_buffer) + _write_buffer.reset(new char[write_buffer_length], [](char* p){ delete[] p; }); + len = std::min(write_buffer_length, len); + memset(_write_buffer.get(), 0, len); // just in case aes.encode screws up + /** + * every sizeof(crypt_buf) bytes the aes channel + * has an error and doesn't decrypt properly... disable + * for now because we are going to upgrade to something + * better. + */ + uint32_t ciphertext_len = _send_aes.encode( buffer, len, _write_buffer.get() ); + assert(ciphertext_len == len); + _sock.write( _write_buffer, ciphertext_len ); + return ciphertext_len; +} FC_RETHROW_EXCEPTIONS( warn, "", ("len",len) ) } + +size_t stcp_socket::writesome( const std::shared_ptr& buf, size_t len, size_t offset ) +{ + return writesome(buf.get() + offset, len); +} + +void stcp_socket::flush() +{ + _sock.flush(); +} + + +void stcp_socket::close() +{ + try + { + _sock.close(); + }FC_RETHROW_EXCEPTIONS( warn, "error closing stcp socket" ); +} + +void stcp_socket::accept() +{ + do_key_exchange(); +} + + +}} // namespace graphene::p2p +