/* * Copyright (c) 2015, Cryptonomex, Inc. * All rights reserved. * * This source code is provided for evaluation in private test networks only, until September 8, 2015. After this date, this license expires and * the code may not be used, modified or distributed for any purpose. Redistribution and use in source and binary forms, with or without modification, * are permitted until September 8, 2015, provided that the following conditions are met: * * 1. The code and/or derivative works are used only for private test networks consisting of no more than 10 P2P nodes. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace graphene { namespace net { struct firewall_check_state_data { node_id_t expected_node_id; fc::ip::endpoint endpoint_to_test; // if we're coordinating a firewall check for another node, these are the helper // nodes we've already had do the test (if this structure is still relevant, that // that means they have all had indeterminate results std::set nodes_already_tested; // If we're a just a helper node, this is the node we report back to // when we have a result node_id_t requesting_peer; }; class peer_connection; class peer_connection_delegate { public: virtual void on_message(peer_connection* originating_peer, const message& received_message) = 0; virtual void on_connection_closed(peer_connection* originating_peer) = 0; virtual message get_message_for_item(const item_id& item) = 0; }; class peer_connection; typedef std::shared_ptr peer_connection_ptr; class peer_connection : public message_oriented_connection_delegate, public std::enable_shared_from_this { public: enum class our_connection_state { disconnected, just_connected, // if in this state, we have sent a hello_message connection_accepted, // remote side has sent us a connection_accepted, we're operating normally with them connection_rejected // remote side has sent us a connection_rejected, we may be exchanging address with them or may just be waiting for them to close }; enum class their_connection_state { disconnected, just_connected, // we have not yet received a hello_message connection_accepted, // we have sent them a connection_accepted connection_rejected // we have sent them a connection_rejected }; enum class connection_negotiation_status { disconnected, connecting, connected, accepting, accepted, hello_sent, peer_connection_accepted, peer_connection_rejected, negotiation_complete, closing, closed }; private: peer_connection_delegate* _node; fc::optional _remote_endpoint; message_oriented_connection _message_connection; /* a base class for messages on the queue, to hide the fact that some * messages are complete messages and some are only hashes of messages. */ struct queued_message { fc::time_point enqueue_time; fc::time_point transmission_start_time; fc::time_point transmission_finish_time; queued_message(fc::time_point enqueue_time = fc::time_point::now()) : enqueue_time(enqueue_time) {} virtual message get_message(peer_connection_delegate* node) = 0; /** returns roughly the number of bytes of memory the message is consuming while * it is sitting on the queue */ virtual size_t get_size_in_queue() = 0; virtual ~queued_message() {} }; /* when you queue up a 'real_queued_message', a full copy of the message is * stored on the heap until it is sent */ struct real_queued_message : queued_message { message message_to_send; size_t message_send_time_field_offset; real_queued_message(message message_to_send, size_t message_send_time_field_offset = (size_t)-1) : message_to_send(std::move(message_to_send)), message_send_time_field_offset(message_send_time_field_offset) {} message get_message(peer_connection_delegate* node) override; size_t get_size_in_queue() override; }; /* when you queue up a 'virtual_queued_message', we just queue up the hash of the * item we want to send. When it reaches the top of the queue, we make a callback * to the node to generate the message. */ struct virtual_queued_message : queued_message { item_id item_to_send; virtual_queued_message(item_id item_to_send) : item_to_send(std::move(item_to_send)) {} message get_message(peer_connection_delegate* node) override; size_t get_size_in_queue() override; }; size_t _total_queued_messages_size; std::queue, std::list > > _queued_messages; fc::future _send_queued_messages_done; public: fc::time_point connection_initiation_time; fc::time_point connection_closed_time; fc::time_point connection_terminated_time; peer_connection_direction direction; //connection_state state; firewalled_state is_firewalled; fc::microseconds clock_offset; fc::microseconds round_trip_delay; our_connection_state our_state; bool they_have_requested_close; their_connection_state their_state; bool we_have_requested_close; connection_negotiation_status negotiation_status; fc::oexception connection_closed_error; fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); } fc::time_point get_connection_terminated_time()const { return connection_terminated_time; } /// data about the peer node /// @{ /** node_public_key from the hello message, zero-initialized before we get the hello */ node_id_t node_public_key; /** the unique identifier we'll use to refer to the node with. zero-initialized before * we receive the hello message, at which time it will be filled with either the "node_id" * from the user_data field of the hello, or if none is present it will be filled with a * copy of node_public_key */ node_id_t node_id; uint32_t core_protocol_version; std::string user_agent; fc::optional graphene_git_revision_sha; fc::optional graphene_git_revision_unix_timestamp; fc::optional fc_git_revision_sha; fc::optional fc_git_revision_unix_timestamp; fc::optional platform; fc::optional bitness; // for inbound connections, these fields record what the peer sent us in // its hello message. For outbound, they record what we sent the peer // in our hello message fc::ip::address inbound_address; uint16_t inbound_port; uint16_t outbound_port; /// @} typedef std::unordered_map item_to_time_map_type; /// blockchain synchronization state data /// @{ boost::container::deque ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about std::set ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids bool peer_needs_sync_items_from_us; bool we_need_sync_items_from_peer; fc::optional > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy() item_to_time_map_type sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects uint32_t last_block_number_delegate_has_seen; /// the number of the last block this peer has told us about that the delegate knows (ids_of_items_to_get[0] should be the id of block [this value + 1]) item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows fc::time_point_sec last_block_time_delegate_has_seen; bool inhibit_fetching_sync_blocks; /// @} /// non-synchronization state data /// @{ struct timestamped_item_id { item_id item; fc::time_point_sec timestamp; timestamped_item_id(const item_id& item, const fc::time_point_sec timestamp) : item(item), timestamp(timestamp) {} }; struct timestamp_index{}; typedef boost::multi_index_container, std::hash >, boost::multi_index::ordered_non_unique, boost::multi_index::member > > > timestamped_items_set_type; timestamped_items_set_type inventory_peer_advertised_to_us; timestamped_items_set_type inventory_advertised_to_peer; item_to_time_map_type items_requested_from_peer; /// items we've requested from this peer during normal operation. fetch from another peer if this peer disconnects /// @} // if they're flooding us with transactions, we set this to avoid fetching for a few seconds to let the // blockchain catch up fc::time_point transaction_fetching_inhibited_until; uint32_t last_known_fork_block_number; fc::future accept_or_connect_task_done; firewall_check_state_data *firewall_check_state; #ifndef NDEBUG private: fc::thread* _thread; unsigned _send_message_queue_tasks_running; // temporary debugging #endif private: peer_connection(peer_connection_delegate* delegate); void destroy(); public: static peer_connection_ptr make_shared(peer_connection_delegate* delegate); // use this instead of the constructor virtual ~peer_connection(); fc::tcp_socket& get_socket(); void accept_connection(); void connect_to(const fc::ip::endpoint& remote_endpoint, fc::optional local_endpoint = fc::optional()); void on_message(message_oriented_connection* originating_connection, const message& received_message) override; void on_connection_closed(message_oriented_connection* originating_connection) override; void send_queueable_message(std::unique_ptr&& message_to_send); void send_message(const message& message_to_send, size_t message_send_time_field_offset = (size_t)-1); void send_item(const item_id& item_to_send); void close_connection(); void destroy_connection(); uint64_t get_total_bytes_sent() const; uint64_t get_total_bytes_received() const; fc::time_point get_last_message_sent_time() const; fc::time_point get_last_message_received_time() const; fc::optional get_remote_endpoint(); fc::ip::endpoint get_local_endpoint(); void set_remote_endpoint(fc::optional new_remote_endpoint); bool busy(); bool idle(); bool is_transaction_fetching_inhibited() const; fc::sha512 get_shared_secret() const; void clear_old_inventory(); bool is_inventory_advertised_to_us_list_full_for_transactions() const; bool is_inventory_advertised_to_us_list_full() const; bool performing_firewall_check() const; fc::optional get_endpoint_for_connecting() const; private: void send_queued_messages_task(); void accept_connection_task(); void connect_to_task(const fc::ip::endpoint& remote_endpoint); }; typedef std::shared_ptr peer_connection_ptr; } } // end namespace graphene::net // not sent over the wire, just reflected for logging FC_REFLECT_ENUM(graphene::net::peer_connection::our_connection_state, (disconnected) (just_connected) (connection_accepted) (connection_rejected)) FC_REFLECT_ENUM(graphene::net::peer_connection::their_connection_state, (disconnected) (just_connected) (connection_accepted) (connection_rejected)) FC_REFLECT_ENUM(graphene::net::peer_connection::connection_negotiation_status, (disconnected) (connecting) (connected) (accepting) (accepted) (hello_sent) (peer_connection_accepted) (peer_connection_rejected) (negotiation_complete) (closing) (closed) ) FC_REFLECT( graphene::net::peer_connection::timestamped_item_id, (item)(timestamp));