From a84e56c2aac6b4edfe9b36b50fbdfa11692e70c6 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Thu, 3 Sep 2015 17:43:26 -0400 Subject: [PATCH] fix market subscriptions --- libraries/app/api.cpp | 2 -- libraries/p2p/design.md | 8 +++++++- libraries/p2p/include/graphene/p2p/message.hpp | 12 +++++++++++- .../p2p/include/graphene/p2p/peer_connection.hpp | 16 +++++++++++++--- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/libraries/app/api.cpp b/libraries/app/api.cpp index ed7809f8..798dc460 100644 --- a/libraries/app/api.cpp +++ b/libraries/app/api.cpp @@ -874,7 +874,6 @@ namespace graphene { namespace app { } } - /* if( _market_subscriptions.size() ) { if( !_subscribe_callback ) @@ -890,7 +889,6 @@ namespace graphene { namespace app { } } } - */ } auto capture_this = shared_from_this(); diff --git a/libraries/p2p/design.md b/libraries/p2p/design.md index d55c1411..96653d7e 100644 --- a/libraries/p2p/design.md +++ b/libraries/p2p/design.md @@ -61,7 +61,10 @@ Each node implements the following protocol: send( peer, block_summary ) - onConnect( new_peer, new_peer_head_block_num ) + onHello( new_peer, new_peer_head_block_num ) + + replyHello( new_peer ) // ack the hello message with our timestamp to measure latency + if( peers.size() >= max_peers ) send( new_peer, peers ) disconnect( new_peer ) @@ -73,6 +76,9 @@ Each node implements the following protocol: new_peer.synced = true for( peer : peers ) send( peer, new_peer ) + + onHelloReply( from_peer, hello_reply ) + update_latency_measure, disconnect if too slow onReceivePeers( from_peer, peers ) addToPotentialPeers( peers ) diff --git a/libraries/p2p/include/graphene/p2p/message.hpp b/libraries/p2p/include/graphene/p2p/message.hpp index 926180d1..2ffa456e 100644 --- a/libraries/p2p/include/graphene/p2p/message.hpp +++ b/libraries/p2p/include/graphene/p2p/message.hpp @@ -8,6 +8,7 @@ #include namespace graphene { namespace p2p { + using namespace graphene::chain; struct message_header { @@ -94,16 +95,25 @@ namespace graphene { namespace p2p { std::string user_agent; uint16_t version; + fc::time_point timestamp; fc::ip::address inbound_address; uint16_t inbound_port; uint16_t outbound_port; - node_id_t node_public_key; + public_key_type node_public_key; fc::sha256 chain_id; fc::variant_object user_data; block_id_type head_block; }; + struct hello_reply_message + { + static const core_message_type_enum type; + + fc::time_point hello_timestamp; + fc::time_point reply_timestamp; + }; + struct transaction_message { static const core_message_type_enum type; diff --git a/libraries/p2p/include/graphene/p2p/peer_connection.hpp b/libraries/p2p/include/graphene/p2p/peer_connection.hpp index 8f0ab594..759a7132 100644 --- a/libraries/p2p/include/graphene/p2p/peer_connection.hpp +++ b/libraries/p2p/include/graphene/p2p/peer_connection.hpp @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -42,16 +43,25 @@ namespace graphene { namespace p2p { class peer_connection_delegate { public: - virtual void on_message(peer_connection* originating_peer, - const message& received_message) = 0; + virtual void on_message(peer_connection* originating_peer, const message& received_message) = 0; virtual void on_connection_closed(peer_connection* originating_peer) = 0; - virtual message get_message_for_item(const item_id& item) = 0; }; class peer_connection; typedef std::shared_ptr peer_connection_ptr; + /** + * Each connection maintains its own queue of messages to be sent, when an item + * is first pushed to the queue it starts an async fiber that will sequentially write + * all items until there is nothing left to be sent. + * + * If a particular connection is unable to keep up with the real-time stream of + * messages to be sent then it will be disconnected. The backlog will be measured in + * seconds. + * + * A multi-index container that tracks the + */ class peer_connection : public message_oriented_connection_delegate, public std::enable_shared_from_this {