From 74b707999cc608abb628727fc75e24f6fe622029 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Fri, 27 Mar 2015 16:29:33 -0400 Subject: [PATCH] fix bugs with websocket and integrate API support --- CMakeLists.txt | 2 - include/fc/network/http/websocket.hpp | 32 +++--- include/fc/rpc/api_connection.hpp | 40 ++++--- include/fc/rpc/websocket_api.hpp | 31 +++--- src/network/http/websocket.cpp | 144 +++++++++++++++++++------- tests/api.cpp | 61 +++++++++-- 6 files changed, 211 insertions(+), 99 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 90cca3f..91567fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -240,8 +240,6 @@ add_executable( api tests/api.cpp ) target_link_libraries( api fc ) include_directories( vendor/websocketpp ) -add_executable( websockettest tests/websocket.cpp ) -target_link_libraries( websockettest fc ) add_executable( ntp_test ntp_test.cpp ) target_link_libraries( ntp_test fc ) diff --git a/include/fc/network/http/websocket.hpp b/include/fc/network/http/websocket.hpp index 19eca30..5d0e554 100644 --- a/include/fc/network/http/websocket.hpp +++ b/include/fc/network/http/websocket.hpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace fc { namespace http { namespace detail { @@ -14,25 +15,20 @@ namespace fc { namespace http { public: virtual ~websocket_connection(){}; virtual void send_message( const std::string& message ) = 0; + virtual void close( int64_t code, const std::string& reason ){}; + void on_message( const std::string& message ) { _on_message(message); } + + void on_message_handler( const std::function& h ) { _on_message = h; } + + void set_session_data( fc::any d ){ _session_data = std::move(d); } + fc::any& get_session_data() { return _session_data; } + private: + fc::any _session_data; + std::function _on_message; }; typedef std::shared_ptr websocket_connection_ptr; - class websocket_session - { - public: - websocket_session( const websocket_connection_ptr& con ) - :_connection(con){} - - virtual ~websocket_session(){}; - virtual void on_message( const std::string& message ) = 0; - - void send_message( const std::string& message ) { _connection->send_message(message); } - private: - websocket_connection_ptr _connection; - }; - typedef std::shared_ptr websocket_session_ptr; - - typedef std::function< websocket_session_ptr( const websocket_connection_ptr& ) > session_factory; + typedef std::function on_connection_handler; class websocket_server { @@ -40,7 +36,7 @@ namespace fc { namespace http { websocket_server(); ~websocket_server(); - void on_connection( const session_factory& factory ); + void on_connection( const on_connection_handler& handler); void listen( uint16_t port ); void start_accept(); @@ -55,7 +51,7 @@ namespace fc { namespace http { websocket_client(); ~websocket_client(); - websocket_session_ptr connect( const std::string& uri, const session_factory& ); + websocket_connection_ptr connect( const std::string& uri ); private: std::unique_ptr my; }; diff --git a/include/fc/rpc/api_connection.hpp b/include/fc/rpc/api_connection.hpp index 8975e77..4f97922 100644 --- a/include/fc/rpc/api_connection.hpp +++ b/include/fc/rpc/api_connection.hpp @@ -111,15 +111,11 @@ namespace fc { } /** makes calls to the remote server */ - virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) - { - FC_ASSERT( _remote_connection ); - return _remote_connection->receive_call( api_id, method_name, args ); - } + virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) = 0; variant receive_call( api_id_type api_id, const string& method_name, const variants& args = variants() )const { - wdump( (api_id)(method_name)(args) ); + //wdump( (api_id)(method_name)(args) ); FC_ASSERT( _local_apis.size() > api_id ); return _local_apis[api_id]->call( method_name, args ); } @@ -131,19 +127,8 @@ namespace fc { return _local_apis.size() - 1; } - void set_remote_connection( const std::shared_ptr& rc ) - { - FC_ASSERT( !_remote_connection ); - FC_ASSERT( rc != this->shared_from_this() ); - _remote_connection = rc; - if( _remote_connection && _remote_connection->remote_connection() != this->shared_from_this() ) - _remote_connection->set_remote_connection( this->shared_from_this() ); - } - const std::shared_ptr& remote_connection()const { return _remote_connection; } - private: std::vector< std::unique_ptr > _local_apis; - std::shared_ptr _remote_connection; struct api_visitor @@ -187,6 +172,27 @@ namespace fc { }; }; + class local_api_connection : public api_connection + { + public: + /** makes calls to the remote server */ + virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) override + { + FC_ASSERT( _remote_connection ); + return _remote_connection->receive_call( api_id, method_name, args ); + } + + void set_remote_connection( const std::shared_ptr& rc ) + { + FC_ASSERT( !_remote_connection ); + FC_ASSERT( rc != this->shared_from_this() ); + _remote_connection = rc; + } + const std::shared_ptr& remote_connection()const { return _remote_connection; } + + std::shared_ptr _remote_connection; + }; + template detail::generic_api::generic_api( const Api& a, const std::shared_ptr& c ) :_api_connection(c),_api(a) diff --git a/include/fc/rpc/websocket_api.hpp b/include/fc/rpc/websocket_api.hpp index b275391..aa2f3f6 100644 --- a/include/fc/rpc/websocket_api.hpp +++ b/include/fc/rpc/websocket_api.hpp @@ -7,31 +7,32 @@ namespace fc { namespace rpc { - class websocket_api : public api_connection, public fc::rpc::state, public fc::http::websocket_session + class websocket_api_connection : public api_connection { public: - websocket_api( fc::http::websocket_connection_ptr c ) - :fc::http::websocket_session(c) + websocket_api_connection( fc::http::websocket_connection_ptr c ) + :_connection(c) { - add_method( "call", [this]( const variants& args ) -> variant { + _rpc_state.add_method( "call", [this]( const variants& args ) -> variant { FC_ASSERT( args.size() == 3 && args[2].is_array() ); return this->receive_call( args[0].as_uint64(), - args[1].as_string(), - args[2].get_array() ); + args[1].as_string(), + args[2].get_array() ); }); + _connection->on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); } virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) override { - auto request = this->start_remote_call( "call", {api_id, method_name, args} ); - send_message( fc::json::to_string(request) ); - return wait_for_response( *request.id ); + auto request = _rpc_state.start_remote_call( "call", {api_id, method_name, args} ); + _connection->send_message( fc::json::to_string(request) ); + return _rpc_state.wait_for_response( *request.id ); } protected: - virtual void on_message( const std::string& message ) + void on_message( const std::string& message ) { auto var = fc::json::from_string(message); const auto& var_obj = var.get_object(); @@ -39,26 +40,28 @@ namespace fc { namespace rpc { { auto call = var.as(); try { - auto result = local_call( call.method, call.params ); + auto result = _rpc_state.local_call( call.method, call.params ); if( call.id ) { - send_message( fc::json::to_string( response( *call.id, result ) ) ); + _connection->send_message( fc::json::to_string( response( *call.id, result ) ) ); } } catch ( const fc::exception& e ) { if( call.id ) { - send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); + _connection->send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); } } } else { auto reply = var.as(); - handle_reply( reply ); + _rpc_state.handle_reply( reply ); } } + fc::http::websocket_connection_ptr _connection; + fc::rpc::state _rpc_state; }; } } // namespace fc::rpc diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index bef4baa..740f603 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -13,8 +14,55 @@ namespace fc { namespace http { namespace detail { + + struct asio_with_stub_log : public websocketpp::config::asio { + + typedef asio_with_stub_log type; + typedef asio base; + + typedef base::concurrency_type concurrency_type; + + typedef base::request_type request_type; + typedef base::response_type response_type; + + typedef base::message_type message_type; + typedef base::con_msg_manager_type con_msg_manager_type; + typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; + + /// Custom Logging policies + /*typedef websocketpp::log::syslog elog_type; + typedef websocketpp::log::syslog alog_type; + */ + //typedef base::alog_type alog_type; + //typedef base::elog_type elog_type; + typedef websocketpp::log::stub elog_type; + typedef websocketpp::log::stub alog_type; + + typedef base::rng_type rng_type; + + struct transport_config : public base::transport_config { + typedef type::concurrency_type concurrency_type; + typedef type::alog_type alog_type; + typedef type::elog_type elog_type; + typedef type::request_type request_type; + typedef type::response_type response_type; + typedef websocketpp::transport::asio::basic_socket::endpoint + socket_type; + }; + + typedef websocketpp::transport::asio::endpoint + transport_type; + + static const long timeout_open_handshake = 0; + }; + + + + using websocketpp::connection_hdl; - typedef websocketpp::server websocket_server_type; + typedef websocketpp::server websocket_server_type; template class websocket_connection_impl : public websocket_connection @@ -23,35 +71,38 @@ namespace fc { namespace http { websocket_connection_impl( T con ) :_ws_connection(con){} - virtual void send_message( const std::string& message ) + virtual void send_message( const std::string& message )override { _ws_connection->send( message ); } + virtual void close( int64_t code, const std::string& reason )override + { + _ws_connection->close(code,reason); + } T _ws_connection; }; - class websocket_server_impl { public: websocket_server_impl() :_server_thread( fc::thread::current() ) { + _server.clear_access_channels( websocketpp::log::alevel::all ); _server.init_asio(&fc::asio::default_io_service()); _server.set_reuse_addr(true); _server.set_open_handler( [&]( connection_hdl hdl ){ _server_thread.async( [&](){ - wlog( "on open server" ); auto new_con = std::make_shared>( _server.get_con_from_hdl(hdl) ); - _connections[hdl] = _factory( new_con ); + _on_connection( _connections[hdl] = new_con ); }).wait(); }); _server.set_message_handler( [&]( connection_hdl hdl, websocket_server_type::message_ptr msg ){ _server_thread.async( [&](){ auto current_con = _connections.find(hdl); assert( current_con != _connections.end() ); - wdump(("server")(msg->get_payload())); + //wdump(("server")(msg->get_payload())); current_con->second->on_message( msg->get_payload() ); }).wait(); }); @@ -62,22 +113,34 @@ namespace fc { namespace http { }); _server.set_fail_handler( [&]( connection_hdl hdl ){ - _server_thread.async( [&](){ - _connections.erase( hdl ); - }).wait(); + if( _server.is_listening() ) + { + _server_thread.async( [&](){ + _connections.erase( hdl ); + }).wait(); + } }); } + ~websocket_server_impl() + { + if( _server.is_listening() ) + _server.stop_listening(); + auto cpy_con = _connections; + for( auto item : cpy_con ) + _server.close( item.first, 0, "server exit" ); + } - typedef std::map > con_map; + typedef std::map > con_map; - con_map _connections; - fc::thread& _server_thread; - websocket_server_type _server; - session_factory _factory; + con_map _connections; + fc::thread& _server_thread; + websocket_server_type _server; + on_connection_handler _on_connection; + fc::promise::ptr _closed; }; - typedef websocketpp::client websocket_client_type; - typedef websocket_client_type::connection_ptr websocket_client_connection_type; + typedef websocketpp::client websocket_client_type; + typedef websocket_client_type::connection_ptr websocket_client_connection_type; class websocket_client_impl { @@ -87,33 +150,40 @@ namespace fc { namespace http { websocket_client_impl() :_client_thread( fc::thread::current() ) { - _client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ - elog( "default open client" ); - }); + _client.clear_access_channels( websocketpp::log::alevel::all ); _client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){ - wlog("start wait"); _client_thread.async( [&](){ - wdump((msg->get_payload())); - _session->on_message( msg->get_payload() ); + // wdump((msg->get_payload())); + _connection->on_message( msg->get_payload() ); }).wait(); - wlog("done wait"); }); _client.set_close_handler( [=]( connection_hdl hdl ){ - wlog("start wait"); - _client_thread.async( [&](){ _session.reset(); } ).wait(); - wlog("done wait"); + _client_thread.async( [&](){ _connection.reset(); } ).wait(); + if( _closed ) _closed->set_value(); }); _client.set_fail_handler( [=]( connection_hdl hdl ){ - wlog("start wait"); - _client_thread.async( [&](){ _session.reset(); } ).wait(); - wlog("done wait"); + auto con = _client.get_con_from_hdl(hdl); + auto message = con->get_ec().message(); + _client_thread.async( [&](){ _connection.reset(); } ).wait(); + if( _connected && !_connected->ready() ) + _connected->set_exception( exception_ptr( new FC_EXCEPTION( exception, "${message}", ("message",message)) ) ); + if( _closed ) + _closed->set_value(); }); _client.init_asio( &fc::asio::default_io_service() ); } + ~websocket_client_impl() + { + if(_connection ) + { + _connection->close(0, "client closed"); + _closed->wait(); + } + } fc::promise::ptr _connected; + fc::promise::ptr _closed; fc::thread& _client_thread; websocket_client_type _client; - websocket_session_ptr _session; websocket_connection_ptr _connection; }; } // namespace detail @@ -121,9 +191,9 @@ namespace fc { namespace http { websocket_server::websocket_server():my( new detail::websocket_server_impl() ) {} websocket_server::~websocket_server(){} - void websocket_server::on_connection( const session_factory& factory ) + void websocket_server::on_connection( const on_connection_handler& handler ) { - my->_factory = factory; + my->_on_connection = handler; } void websocket_server::listen( uint16_t port ) @@ -136,10 +206,10 @@ namespace fc { namespace http { } websocket_client::websocket_client():my( new detail::websocket_client_impl() ) {} - websocket_client::~websocket_client(){} - websocket_session_ptr websocket_client::connect( const std::string& uri, const session_factory& factory ) + websocket_client::~websocket_client(){ } + websocket_connection_ptr websocket_client::connect( const std::string& uri ) { try { - wlog( "connecting to ${uri}", ("uri",uri)); + // wlog( "connecting to ${uri}", ("uri",uri)); websocketpp::lib::error_code ec; my->_connected = fc::promise::ptr( new fc::promise("websocket::connect") ); @@ -147,7 +217,7 @@ namespace fc { namespace http { my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ auto con = my->_client.get_con_from_hdl(hdl); my->_connection = std::make_shared>( con ); - my->_session = factory( my->_connection ); + my->_closed = fc::promise::ptr( new fc::promise("websocket::closed") ); my->_connected->set_value(); }); @@ -158,7 +228,7 @@ namespace fc { namespace http { } my->_client.connect(con); my->_connected->wait(); - return my->_session; + return my->_connection; } FC_CAPTURE_AND_RETHROW( (uri) ) } } } // fc::http diff --git a/tests/api.cpp b/tests/api.cpp index aedd6aa..c9661a8 100644 --- a/tests/api.cpp +++ b/tests/api.cpp @@ -1,6 +1,7 @@ #include #include #include +#include class calculator { @@ -12,7 +13,7 @@ class calculator FC_API( calculator, (add)(sub) ) -class nested_api +class login_api { public: fc::api get_calc()const @@ -22,7 +23,7 @@ class nested_api } fc::optional> calc; }; -FC_API( nested_api, (get_calc) ); +FC_API( login_api, (get_calc) ); using namespace fc; @@ -39,9 +40,46 @@ class variant_calculator double sub( fc::variant a, fc::variant b ) { return a.as_double()-b.as_double(); } }; +using namespace fc::http; +using namespace fc::rpc; int main( int argc, char** argv ) { + { + fc::api calc_api( std::make_shared() ); + + fc::http::websocket_server server; + server.on_connection([&]( const websocket_connection_ptr& c ){ + auto wsc = std::make_shared(c); + auto login = std::make_shared(); + login->calc = calc_api; + wsc->register_api(fc::api(login)); + c->set_session_data( wsc ); + }); + + server.listen( 8090 ); + server.start_accept(); + + for( uint32_t i = 0; i < 5000; ++i ) + { + try { + fc::http::websocket_client client; + auto con = client.connect( "ws://localhost:8090" ); + auto apic = std::make_shared(con); + auto remote_login_api = apic->get_remote_api(); + auto remote_calc = remote_login_api->get_calc(); + wdump((remote_calc->add( 4, 5 ))); + } catch ( const fc::exception& e ) + { + edump((e.to_detail_string())); + } + } + wlog( "exit scope" ); + } + wlog( "returning now..." ); + + return 0; + some_calculator calc; variant_calculator vcalc; @@ -78,9 +116,9 @@ int main( int argc, char** argv ) ilog( "------------------ NESTED TEST --------------" ); try { - nested_api napi_impl; + login_api napi_impl; napi_impl.calc = api_calc; - fc::api napi(&napi_impl); + fc::api napi(&napi_impl); fc::api_server server; auto api_id = server.register_api( napi ); @@ -93,9 +131,9 @@ int main( int argc, char** argv ) fc::api serv( &server ); - fc::api_client apic( serv ); + fc::api_client apic( serv ); - fc::api remote_api = apic; + fc::api remote_api = apic; auto remote_calc = remote_api->get_calc(); @@ -110,18 +148,19 @@ int main( int argc, char** argv ) ilog( "------------------ NESTED TEST --------------" ); try { - nested_api napi_impl; + login_api napi_impl; napi_impl.calc = api_calc; - fc::api napi(&napi_impl); + fc::api napi(&napi_impl); - auto client_side = std::make_shared(); - auto server_side = std::make_shared(); + auto client_side = std::make_shared(); + auto server_side = std::make_shared(); server_side->set_remote_connection( client_side ); + client_side->set_remote_connection( server_side ); server_side->register_api( napi ); - fc::api remote_api = client_side->get_remote_api(); + fc::api remote_api = client_side->get_remote_api(); auto remote_calc = remote_api->get_calc(); int r = remote_calc->add( 4, 5 );