From bee73bd491fd31440c6c0ad1ad8e914ad1902898 Mon Sep 17 00:00:00 2001 From: abitmore Date: Sat, 2 May 2020 11:17:07 -0400 Subject: [PATCH 1/9] Refactor websocket_client, fix duplicate code Fix duplicate code issue in websocket_client::connect() and websocket_client::secure_connect(), and other minor issues --- include/fc/network/http/websocket.hpp | 2 +- src/network/http/websocket.cpp | 119 +++++++++++++------------- 2 files changed, 62 insertions(+), 59 deletions(-) diff --git a/include/fc/network/http/websocket.hpp b/include/fc/network/http/websocket.hpp index 0c765c1..5816a57 100644 --- a/include/fc/network/http/websocket.hpp +++ b/include/fc/network/http/websocket.hpp @@ -101,7 +101,7 @@ namespace fc { namespace http { private: std::unique_ptr my; std::unique_ptr smy; - std::vector> headers; + fc::http::headers _headers; }; } } diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index b2997d9..c9bc8e8 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -499,16 +499,32 @@ namespace fc { namespace http { }).wait(); }); _client.set_close_handler( [=]( connection_hdl hdl ){ - _client_thread.async( [&](){ if( _connection ) {_connection->closed(); _connection.reset();} } ).wait(); - if( _closed ) _closed->set_value(); + _client_thread.async( [&](){ + if( _connection ) { + _connection->closed(); + _connection.reset(); + } + } ).wait(); + if( _closed ) + _closed->set_value(); }); _client.set_fail_handler( [=]( connection_hdl hdl ){ auto con = _client.get_con_from_hdl(hdl); auto message = con->get_ec().message(); if( _connection ) - _client_thread.async( [&](){ if( _connection ) _connection->closed(); _connection.reset(); } ).wait(); + { + _client_thread.async( [&](){ + if( _connection ) { + _connection->closed(); + _connection.reset(); + } + } ).wait(); + } if( _connected && !_connected->ready() ) - _connected->set_exception( exception_ptr( new FC_EXCEPTION( exception, "${message}", ("message",message)) ) ); + { + _connected->set_exception( exception_ptr( + new FC_EXCEPTION( exception, "${message}", ("message",message)) ) ); + } if( _closed ) _closed->set_value(); }); @@ -525,6 +541,37 @@ namespace fc { namespace http { if( _closed ) _closed->wait(); } + + websocket_connection_ptr connect( const std::string& uri, const fc::http::headers& headers ) + { + websocketpp::lib::error_code ec; + + _uri = uri; + _connected = promise::create("websocket::connect"); + + _client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ + _hdl = hdl; + auto con = _client.get_con_from_hdl(hdl); + _connection = std::make_shared::connection_ptr>>( con ); + _closed = promise::create("websocket::closed"); + _connected->set_value(); + }); + + auto con = _client.get_connection( uri, ec ); + + for( const fc::http::header& h : headers ) + { + con->append_header( h.key, h.val ); + } + + FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); + + _client.connect(con); + _connected->wait(); + return _connection; + } + fc::promise::ptr _connected; fc::promise::ptr _closed; fc::thread& _client_thread; @@ -685,66 +732,22 @@ namespace fc { namespace http { websocket_connection_ptr websocket_client::connect( const std::string& uri ) { try { - if( uri.substr(0,4) == "wss:" ) - return secure_connect(uri); - FC_ASSERT( uri.substr(0,3) == "ws:" ); - websocketpp::lib::error_code ec; + FC_ASSERT( uri.substr(0,4) == "wss:" || uri.substr(0,3) == "ws:", "Unsupported protocol" ); - my->_uri = uri; - my->_connected = promise::create("websocket::connect"); + // WSS + if( uri.substr(0,4) == "wss:" ) + return smy->connect( uri, _headers ); - my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ - my->_hdl = hdl; - auto con = my->_client.get_con_from_hdl(hdl); - my->_connection = std::make_shared>( con ); - my->_closed = promise::create("websocket::closed"); - my->_connected->set_value(); - }); + // WS + return my->connect( uri, _headers ); - auto con = my->_client.get_connection( uri, ec ); - - std::for_each(headers.begin(), headers.end(), [con](std::pair in) { - con->append_header(in.first, in.second); - }); - if( ec ) FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); - - my->_client.connect(con); - my->_connected->wait(); - return my->_connection; } FC_CAPTURE_AND_RETHROW( (uri) ) } - // TODO most code in this function is same as ::connect, best refactor websocket_connection_ptr websocket_client::secure_connect( const std::string& uri ) - { try { - if( uri.substr(0,3) == "ws:" ) - return connect(uri); - FC_ASSERT( uri.substr(0,4) == "wss:" ); - websocketpp::lib::error_code ec; - - smy->_uri = uri; - smy->_connected = promise::create("websocket::connect"); - - smy->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ - smy->_hdl = hdl; - auto con = smy->_client.get_con_from_hdl(hdl); - smy->_connection = std::make_shared>( con ); - smy->_closed = promise::create("websocket::closed"); - smy->_connected->set_value(); - }); - - auto con = smy->_client.get_connection( uri, ec ); - std::for_each(headers.begin(), headers.end(), [con](std::pair in) { - con->append_header(in.first, in.second); - }); - if( ec ) - FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); - smy->_client.connect(con); - smy->_connected->wait(); - return smy->_connection; - } FC_CAPTURE_AND_RETHROW( (uri) ) } + { + return connect( uri ); + } void websocket_client::close() { @@ -761,7 +764,7 @@ namespace fc { namespace http { void websocket_client::append_header(const std::string& key, const std::string& value) { - headers.push_back( std::make_pair(key, value) ); + _headers.emplace_back( key, value ); } } } // fc::http From 33ce80cc5f1ed7c460d5d52d41877155b64dec41 Mon Sep 17 00:00:00 2001 From: abitmore Date: Sat, 2 May 2020 11:36:05 -0400 Subject: [PATCH 2/9] Remove unused types, wrap long lines --- src/network/http/websocket.cpp | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index c9bc8e8..69fa508 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -473,11 +473,6 @@ namespace fc { namespace http { std::string _forward_header_key; // A header like "X-Forwarded-For" (XFF), with data IP:port }; - typedef websocketpp::client websocket_client_type; - typedef websocketpp::client websocket_tls_client_type; - - typedef websocket_client_type::connection_ptr websocket_client_connection_type; - typedef websocket_tls_client_type::connection_ptr websocket_tls_client_connection_type; template class generic_websocket_client_impl @@ -673,7 +668,8 @@ namespace fc { namespace http { } void websocket_server::listen( const fc::ip::endpoint& ep ) { - my->_server.listen( boost::asio::ip::tcp::endpoint( boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); + my->_server.listen( boost::asio::ip::tcp::endpoint( + boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); } uint16_t websocket_server::get_listening_port() @@ -715,7 +711,8 @@ namespace fc { namespace http { } void websocket_tls_server::listen( const fc::ip::endpoint& ep ) { - my->_server.listen( boost::asio::ip::tcp::endpoint( boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); + my->_server.listen( boost::asio::ip::tcp::endpoint( + boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); } void websocket_tls_server::start_accept() { From 04f5fae050908717e4b9b6608346855708c547d8 Mon Sep 17 00:00:00 2001 From: abitmore Date: Sat, 2 May 2020 13:33:39 -0400 Subject: [PATCH 3/9] Refactor websocket_tls_server_impl with template Removes unnecessary differences between websocket_server_impl and websocket_tls_server_impl. --- src/network/http/websocket.cpp | 211 ++++++++++++--------------------- 1 file changed, 77 insertions(+), 134 deletions(-) diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index 69fa508..fd12383 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -84,23 +84,20 @@ namespace fc { namespace http { typedef type::elog_type elog_type; typedef type::request_type request_type; typedef type::response_type response_type; - typedef websocketpp::transport::asio::basic_socket::endpoint - socket_type; + typedef websocketpp::transport::asio::basic_socket::endpoint socket_type; }; - typedef websocketpp::transport::asio::endpoint - transport_type; + typedef websocketpp::transport::asio::endpoint transport_type; - static const long timeout_open_handshake = 0; - - // permessage_compress extension - struct permessage_deflate_config {}; + // permessage_compress extension + struct permessage_deflate_config {}; #ifdef HAS_ZLIB - typedef websocketpp::extensions::permessage_deflate::enabled permessage_deflate_type; + typedef websocketpp::extensions::permessage_deflate::enabled + permessage_deflate_type; #else - typedef websocketpp::extensions::permessage_deflate::disabled permessage_deflate_type; + typedef websocketpp::extensions::permessage_deflate::disabled + permessage_deflate_type; #endif - }; struct asio_tls_with_stub_log : public websocketpp::config::asio_tls { @@ -130,10 +127,17 @@ namespace fc { namespace http { typedef websocketpp::transport::asio::tls_socket::endpoint socket_type; }; - typedef websocketpp::transport::asio::endpoint - transport_type; + typedef websocketpp::transport::asio::endpoint transport_type; - static const long timeout_open_handshake = 0; + // permessage_compress extension + struct permessage_deflate_config {}; +#ifdef HAS_ZLIB + typedef websocketpp::extensions::permessage_deflate::enabled + permessage_deflate_type; +#else + typedef websocketpp::extensions::permessage_deflate::disabled + permessage_deflate_type; +#endif }; struct asio_tls_stub_log : public websocketpp::config::asio_tls { typedef asio_tls_stub_log type; @@ -154,21 +158,26 @@ namespace fc { namespace http { typedef base::rng_type rng_type; struct transport_config : public base::transport_config { - typedef type::concurrency_type concurrency_type; - typedef type::alog_type alog_type; - typedef type::elog_type elog_type; - typedef type::request_type request_type; - typedef type::response_type response_type; - typedef websocketpp::transport::asio::tls_socket::endpoint socket_type; + typedef type::concurrency_type concurrency_type; + typedef type::alog_type alog_type; + typedef type::elog_type elog_type; + typedef type::request_type request_type; + typedef type::response_type response_type; + typedef websocketpp::transport::asio::tls_socket::endpoint socket_type; }; - typedef websocketpp::transport::asio::endpoint - transport_type; - }; + typedef websocketpp::transport::asio::endpoint transport_type; - using websocketpp::connection_hdl; - typedef websocketpp::server websocket_server_type; - typedef websocketpp::server websocket_tls_server_type; + // permessage_compress extension + struct permessage_deflate_config {}; +#ifdef HAS_ZLIB + typedef websocketpp::extensions::permessage_deflate::enabled + permessage_deflate_type; +#else + typedef websocketpp::extensions::permessage_deflate::disabled + permessage_deflate_type; +#endif + }; template class websocket_connection_impl : public websocket_connection @@ -227,10 +236,13 @@ namespace fc { namespace http { typedef websocketpp::lib::shared_ptr context_ptr; - class websocket_server_impl + using websocketpp::connection_hdl; + + template + class generic_websocket_server_impl { public: - websocket_server_impl( const std::string& forward_header_key ) + generic_websocket_server_impl( const std::string& forward_header_key ) :_server_thread( fc::thread::current() ), _forward_header_key(forward_header_key) { _server.clear_access_channels( websocketpp::log::alevel::all ); @@ -239,12 +251,13 @@ namespace fc { namespace http { _server.set_open_handler( [&]( connection_hdl hdl ){ _server_thread.async( [this, hdl](){ auto new_con = std::make_shared>( _server.get_con_from_hdl(hdl), - _forward_header_key ); + typename websocketpp::server::connection_ptr>>( _server.get_con_from_hdl(hdl), + _forward_header_key ); _on_connection( _connections[hdl] = new_con ); }).wait(); }); - _server.set_message_handler( [&]( connection_hdl hdl, websocket_server_type::message_ptr msg ){ + _server.set_message_handler( [&]( connection_hdl hdl, + typename websocketpp::server::message_ptr msg ){ _server_thread.async( [&](){ auto current_con = _connections.find(hdl); assert( current_con != _connections.end() ); @@ -263,16 +276,17 @@ namespace fc { namespace http { }).wait(); }); - _server.set_socket_init_handler( [&](websocketpp::connection_hdl hdl, boost::asio::ip::tcp::socket& s ) { - boost::asio::ip::tcp::no_delay option(true); - s.lowest_layer().set_option(option); + _server.set_socket_init_handler( [&]( websocketpp::connection_hdl hdl, + typename websocketpp::server::connection_type::socket_type& s ) { + boost::asio::ip::tcp::no_delay option(true); + s.lowest_layer().set_option(option); } ); _server.set_http_handler( [&]( connection_hdl hdl ){ _server_thread.async( [&](){ auto con = _server.get_con_from_hdl(hdl); auto current_con = std::make_shared>( con, _forward_header_key ); + typename websocketpp::server::connection_ptr>>( con, _forward_header_key ); _on_connection( current_con ); con->defer_http_response(); @@ -330,7 +344,8 @@ namespace fc { namespace http { } }); } - ~websocket_server_impl() + + virtual ~generic_websocket_server_impl() { if( _server.is_listening() ) _server.stop_listening(); @@ -345,33 +360,47 @@ namespace fc { namespace http { if( _closed ) _closed->wait(); } - typedef std::map > con_map; + typedef std::map > con_map; con_map _connections; fc::thread& _server_thread; - websocket_server_type _server; + websocketpp::server _server; on_connection_handler _on_connection; fc::promise::ptr _closed; uint32_t _pending_messages = 0; std::string _forward_header_key; // A header like "X-Forwarded-For" (XFF), with data IP:port }; - class websocket_tls_server_impl + class websocket_server_impl : public generic_websocket_server_impl + { + public: + websocket_server_impl( const std::string& forward_header_key ) + : generic_websocket_server_impl( forward_header_key ) + {} + + virtual ~websocket_server_impl() {} + }; + + class websocket_tls_server_impl : public generic_websocket_server_impl { public: websocket_tls_server_impl( const string& server_pem, const string& ssl_password, const std::string& forward_header_key ) - :_server_thread( fc::thread::current() ), _forward_header_key(forward_header_key) + : generic_websocket_server_impl( forward_header_key ) { { _server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr { - context_ptr ctx = websocketpp::lib::make_shared(boost::asio::ssl::context::tlsv1); + context_ptr ctx = websocketpp::lib::make_shared( + boost::asio::ssl::context::tlsv1 ); try { - ctx->set_options(boost::asio::ssl::context::default_workarounds | - boost::asio::ssl::context::no_sslv2 | - boost::asio::ssl::context::no_sslv3 | - boost::asio::ssl::context::single_dh_use); - ctx->set_password_callback([=](std::size_t max_length, boost::asio::ssl::context::password_purpose){ return ssl_password;}); + ctx->set_options( boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::no_sslv2 | + boost::asio::ssl::context::no_sslv3 | + boost::asio::ssl::context::single_dh_use ); + ctx->set_password_callback( + [=](std::size_t max_length, boost::asio::ssl::context::password_purpose){ + return ssl_password; + }); ctx->use_certificate_chain_file(server_pem); ctx->use_private_key_file(server_pem, boost::asio::ssl::context::pem); } catch (std::exception& e) { @@ -380,97 +409,10 @@ namespace fc { namespace http { return ctx; }); } - - _server.clear_access_channels( websocketpp::log::alevel::all ); - _server.init_asio(&fc::asio::default_io_service()); - _server.set_reuse_addr(true); - _server.set_open_handler( [&]( connection_hdl hdl ){ - _server_thread.async( [&](){ - auto new_con = std::make_shared>( _server.get_con_from_hdl(hdl), - _forward_header_key ); - _on_connection( _connections[hdl] = new_con ); - }).wait(); - }); - _server.set_message_handler( [&]( connection_hdl hdl, websocket_server_type::message_ptr msg ){ - _server_thread.async( [&](){ - auto current_con = _connections.find(hdl); - assert( current_con != _connections.end() ); - auto received = msg->get_payload(); - std::shared_ptr con = current_con->second; - wlog( "[IN] ${remote_endpoint} ${msg}", - ("remote_endpoint",con->get_remote_endpoint_string()) ("msg",received) ); - fc::async([con,received](){ con->on_message( received ); }); - }).wait(); - }); - - _server.set_http_handler( [&]( connection_hdl hdl ){ - _server_thread.async( [&](){ - - auto con = _server.get_con_from_hdl(hdl); - auto current_con = std::make_shared>( con, _forward_header_key ); - try{ - _on_connection( current_con ); - - std::string remote_endpoint = current_con->get_remote_endpoint_string(); - std::string request_body = con->get_request_body(); - wlog( "[HTTP-IN] ${remote_endpoint} ${msg}", - ("remote_endpoint",remote_endpoint) ("msg",request_body) ); - auto response = current_con->on_http( request_body ); - ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}", - ("remote_endpoint",remote_endpoint) - ("status",response.status) - ("msg",response.body_as_string) ); - con->set_body( std::move( response.body_as_string ) ); - con->set_status( websocketpp::http::status_code::value( response.status ) ); - } catch ( const fc::exception& e ) - { - edump((e.to_detail_string())); - } - current_con->closed(); - - }).wait(); - }); - - _server.set_close_handler( [&]( connection_hdl hdl ){ - _server_thread.async( [&](){ - _connections[hdl]->closed(); - _connections.erase( hdl ); - }).wait(); - }); - - _server.set_fail_handler( [&]( connection_hdl hdl ){ - if( _server.is_listening() ) - { - _server_thread.async( [&](){ - if( _connections.find(hdl) != _connections.end() ) - { - _connections[hdl]->closed(); - _connections.erase( hdl ); - } - }).wait(); - } - }); } - ~websocket_tls_server_impl() - { - if( _server.is_listening() ) - _server.stop_listening(); - auto cpy_con = _connections; - for( auto item : cpy_con ) - _server.close( item.first, 0, "server exit" ); - } + virtual ~websocket_tls_server_impl() {} - typedef std::map > con_map; - - con_map _connections; - fc::thread& _server_thread; - websocket_tls_server_type _server; - on_connection_handler _on_connection; - fc::promise::ptr _closed; - std::string _forward_header_key; // A header like "X-Forwarded-For" (XFF), with data IP:port }; @@ -526,6 +468,7 @@ namespace fc { namespace http { _client.init_asio( &fc::asio::default_io_service() ); } + virtual ~generic_websocket_client_impl() { if( _connection ) From b77fa2a6519db974a454b2fb3e835dac2b51370b Mon Sep 17 00:00:00 2001 From: abitmore Date: Wed, 6 May 2020 11:52:53 -0400 Subject: [PATCH 4/9] Capture only required vars, update coding style and replace an assertion with a check. --- src/network/http/websocket.cpp | 107 +++++++++++++++++---------------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index fd12383..7a4ad0e 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -248,7 +248,7 @@ namespace fc { namespace http { _server.clear_access_channels( websocketpp::log::alevel::all ); _server.init_asio(&fc::asio::default_io_service()); _server.set_reuse_addr(true); - _server.set_open_handler( [&]( connection_hdl hdl ){ + _server.set_open_handler( [this]( connection_hdl hdl ){ _server_thread.async( [this, hdl](){ auto new_con = std::make_shared::connection_ptr>>( _server.get_con_from_hdl(hdl), @@ -256,11 +256,12 @@ namespace fc { namespace http { _on_connection( _connections[hdl] = new_con ); }).wait(); }); - _server.set_message_handler( [&]( connection_hdl hdl, + _server.set_message_handler( [this]( connection_hdl hdl, typename websocketpp::server::message_ptr msg ){ - _server_thread.async( [&](){ + _server_thread.async( [this,hdl,msg](){ auto current_con = _connections.find(hdl); - assert( current_con != _connections.end() ); + if( current_con == _connections.end() ) + return; auto payload = msg->get_payload(); std::shared_ptr con = current_con->second; wlog( "[IN] ${remote_endpoint} ${msg}", @@ -276,14 +277,14 @@ namespace fc { namespace http { }).wait(); }); - _server.set_socket_init_handler( [&]( websocketpp::connection_hdl hdl, + _server.set_socket_init_handler( [this]( websocketpp::connection_hdl hdl, typename websocketpp::server::connection_type::socket_type& s ) { boost::asio::ip::tcp::no_delay option(true); s.lowest_layer().set_option(option); } ); - _server.set_http_handler( [&]( connection_hdl hdl ){ - _server_thread.async( [&](){ + _server.set_http_handler( [this]( connection_hdl hdl ){ + _server_thread.async( [this,hdl](){ auto con = _server.get_con_from_hdl(hdl); auto current_con = std::make_shared::connection_ptr>>( con, _forward_header_key ); @@ -309,8 +310,8 @@ namespace fc { namespace http { }).wait(); }); - _server.set_close_handler( [&]( connection_hdl hdl ){ - _server_thread.async( [&](){ + _server.set_close_handler( [this]( connection_hdl hdl ){ + _server_thread.async( [this,hdl](){ if( _connections.find(hdl) != _connections.end() ) { _connections[hdl]->closed(); @@ -325,10 +326,10 @@ namespace fc { namespace http { }).wait(); }); - _server.set_fail_handler( [&]( connection_hdl hdl ){ + _server.set_fail_handler( [this]( connection_hdl hdl ){ if( _server.is_listening() ) { - _server_thread.async( [&](){ + _server_thread.async( [this,hdl](){ if( _connections.find(hdl) != _connections.end() ) { _connections[hdl]->closed(); @@ -388,27 +389,25 @@ namespace fc { namespace http { const std::string& forward_header_key ) : generic_websocket_server_impl( forward_header_key ) { - { - _server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr { - context_ptr ctx = websocketpp::lib::make_shared( - boost::asio::ssl::context::tlsv1 ); - try { - ctx->set_options( boost::asio::ssl::context::default_workarounds | - boost::asio::ssl::context::no_sslv2 | - boost::asio::ssl::context::no_sslv3 | - boost::asio::ssl::context::single_dh_use ); - ctx->set_password_callback( - [=](std::size_t max_length, boost::asio::ssl::context::password_purpose){ - return ssl_password; - }); - ctx->use_certificate_chain_file(server_pem); - ctx->use_private_key_file(server_pem, boost::asio::ssl::context::pem); - } catch (std::exception& e) { - std::cout << e.what() << std::endl; - } - return ctx; - }); - } + _server.set_tls_init_handler( [this,server_pem,ssl_password]( websocketpp::connection_hdl hdl ) { + context_ptr ctx = websocketpp::lib::make_shared( + boost::asio::ssl::context::tlsv1 ); + try { + ctx->set_options( boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::no_sslv2 | + boost::asio::ssl::context::no_sslv3 | + boost::asio::ssl::context::single_dh_use ); + ctx->set_password_callback( + [ssl_password](std::size_t max_length, boost::asio::ssl::context::password_purpose){ + return ssl_password; + }); + ctx->use_certificate_chain_file(server_pem); + ctx->use_private_key_file(server_pem, boost::asio::ssl::context::pem); + } catch (std::exception& e) { + std::cout << e.what() << std::endl; + } + return ctx; + }); } virtual ~websocket_tls_server_impl() {} @@ -424,19 +423,19 @@ namespace fc { namespace http { :_client_thread( fc::thread::current() ) { _client.clear_access_channels( websocketpp::log::alevel::all ); - _client.set_message_handler( [&]( connection_hdl hdl, + _client.set_message_handler( [this]( connection_hdl hdl, typename websocketpp::client::message_ptr msg ){ - _client_thread.async( [&](){ + _client_thread.async( [this,msg](){ wdump((msg->get_payload())); auto received = msg->get_payload(); - fc::async( [=](){ + fc::async( [this,received](){ if( _connection ) _connection->on_message(received); }); }).wait(); }); - _client.set_close_handler( [=]( connection_hdl hdl ){ - _client_thread.async( [&](){ + _client.set_close_handler( [this]( connection_hdl hdl ){ + _client_thread.async( [this](){ if( _connection ) { _connection->closed(); _connection.reset(); @@ -445,12 +444,12 @@ namespace fc { namespace http { if( _closed ) _closed->set_value(); }); - _client.set_fail_handler( [=]( connection_hdl hdl ){ + _client.set_fail_handler( [this]( connection_hdl hdl ){ auto con = _client.get_con_from_hdl(hdl); auto message = con->get_ec().message(); if( _connection ) { - _client_thread.async( [&](){ + _client_thread.async( [this](){ if( _connection ) { _connection->closed(); _connection.reset(); @@ -487,7 +486,7 @@ namespace fc { namespace http { _uri = uri; _connected = promise::create("websocket::connect"); - _client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ + _client.set_open_handler( [this]( websocketpp::connection_hdl hdl ){ _hdl = hdl; auto con = _client.get_con_from_hdl(hdl); _connection = std::make_shared(boost::asio::ssl::context::tlsv1); + _client.set_tls_init_handler( [this,ca_filename_copy](websocketpp::connection_hdl) { + context_ptr ctx = websocketpp::lib::make_shared( + boost::asio::ssl::context::tlsv1); try { - ctx->set_options(boost::asio::ssl::context::default_workarounds | - boost::asio::ssl::context::no_sslv2 | - boost::asio::ssl::context::no_sslv3 | - boost::asio::ssl::context::single_dh_use); + ctx->set_options( boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::no_sslv2 | + boost::asio::ssl::context::no_sslv3 | + boost::asio::ssl::context::single_dh_use ); setup_peer_verify( ctx, ca_filename_copy ); } catch (std::exception& e) { From c55b1fde3cb12e86079cdfde05abfd042b3f828f Mon Sep 17 00:00:00 2001 From: abitmore Date: Wed, 6 May 2020 12:53:04 -0400 Subject: [PATCH 5/9] Add notes --- src/network/http/websocket.cpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index 7a4ad0e..c61c190 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -273,7 +273,12 @@ namespace fc { namespace http { con->on_message( payload ); }); if( _pending_messages > 100 ) - f.wait(); + f.wait(); // Note: this is a bit strange, because it forces the server to process all + // 100 pending messages (assuming this message is the last one) before + // trying to accept a new message. + // Ideally the `wait` should be canceled immediately when the number of + // pending messages falls below 100. That said, wait on the whole queue, + // but not wait on one message. }).wait(); }); @@ -290,7 +295,8 @@ namespace fc { namespace http { typename websocketpp::server::connection_ptr>>( con, _forward_header_key ); _on_connection( current_con ); - con->defer_http_response(); + con->defer_http_response(); // Note: this can tie up resources if send_http_response() is not + // called quickly enough std::string remote_endpoint = current_con->get_remote_endpoint_string(); std::string request_body = con->get_request_body(); wlog( "[HTTP-IN] ${remote_endpoint} ${msg}", @@ -351,6 +357,9 @@ namespace fc { namespace http { if( _server.is_listening() ) _server.stop_listening(); + // Note: since _connections can be modified by lambda functions in set_*_handler, which are running + // in another thread, perhaps we need to wait for them (especially the one in set_open_handler) + // being processed. Otherwise `_closed.wait()` may hang. if( _connections.size() ) _closed = promise::create(); @@ -363,7 +372,8 @@ namespace fc { namespace http { typedef std::map > con_map; - con_map _connections; + con_map _connections; // Note: std::map is not thread-safe nor task-safe, we may need + // to use a mutex or similar to avoid concurrent access. fc::thread& _server_thread; websocketpp::server _server; on_connection_handler _on_connection; From 4dd1319a9e7f612918626ef4fe3cea897376fdeb Mon Sep 17 00:00:00 2001 From: abitmore Date: Wed, 6 May 2020 13:07:24 -0400 Subject: [PATCH 6/9] Refactor destructor of websocket_server_impl --- src/network/http/websocket.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index c61c190..f9e1527 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -361,13 +361,15 @@ namespace fc { namespace http { // in another thread, perhaps we need to wait for them (especially the one in set_open_handler) // being processed. Otherwise `_closed.wait()` may hang. if( _connections.size() ) + { _closed = promise::create(); - auto cpy_con = _connections; - for( auto item : cpy_con ) - _server.close( item.first, 0, "server exit" ); + auto cpy_con = _connections; + for( auto& item : cpy_con ) + _server.close( item.first, 0, "server exit" ); - if( _closed ) _closed->wait(); + _closed->wait(); + } } typedef std::map > con_map; From 9ae294293f2c3c096686975a096f938879228fc9 Mon Sep 17 00:00:00 2001 From: abitmore Date: Thu, 18 Jun 2020 18:57:33 -0400 Subject: [PATCH 7/9] Update coding style --- src/network/http/websocket.cpp | 190 ++++++++++++++++----------------- 1 file changed, 95 insertions(+), 95 deletions(-) diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index f9e1527..c970152 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -243,112 +243,112 @@ namespace fc { namespace http { { public: generic_websocket_server_impl( const std::string& forward_header_key ) - :_server_thread( fc::thread::current() ), _forward_header_key(forward_header_key) + :_server_thread( fc::thread::current() ), _forward_header_key( forward_header_key ) { _server.clear_access_channels( websocketpp::log::alevel::all ); - _server.init_asio(&fc::asio::default_io_service()); - _server.set_reuse_addr(true); + _server.init_asio( &fc::asio::default_io_service() ); + _server.set_reuse_addr( true ); _server.set_open_handler( [this]( connection_hdl hdl ){ _server_thread.async( [this, hdl](){ - auto new_con = std::make_shared::connection_ptr>>( _server.get_con_from_hdl(hdl), _forward_header_key ); - _on_connection( _connections[hdl] = new_con ); - }).wait(); + _on_connection( _connections[hdl] = new_con ); + }).wait(); }); _server.set_message_handler( [this]( connection_hdl hdl, typename websocketpp::server::message_ptr msg ){ - _server_thread.async( [this,hdl,msg](){ - auto current_con = _connections.find(hdl); - if( current_con == _connections.end() ) - return; - auto payload = msg->get_payload(); - std::shared_ptr con = current_con->second; - wlog( "[IN] ${remote_endpoint} ${msg}", - ("remote_endpoint",con->get_remote_endpoint_string()) ("msg",payload) ); - ++_pending_messages; - auto f = fc::async([this,con,payload](){ - if( _pending_messages ) - --_pending_messages; - con->on_message( payload ); - }); - if( _pending_messages > 100 ) - f.wait(); // Note: this is a bit strange, because it forces the server to process all - // 100 pending messages (assuming this message is the last one) before - // trying to accept a new message. - // Ideally the `wait` should be canceled immediately when the number of - // pending messages falls below 100. That said, wait on the whole queue, - // but not wait on one message. - }).wait(); + _server_thread.async( [this,hdl,msg](){ + auto current_con = _connections.find(hdl); + if( current_con == _connections.end() ) + return; + auto payload = msg->get_payload(); + std::shared_ptr con = current_con->second; + wlog( "[IN] ${remote_endpoint} ${msg}", + ("remote_endpoint",con->get_remote_endpoint_string()) ("msg",payload) ); + ++_pending_messages; + auto f = fc::async([this,con,payload](){ + if( _pending_messages ) + --_pending_messages; + con->on_message( payload ); + }); + if( _pending_messages > 100 ) + f.wait(); // Note: this is a bit strange, because it forces the server to process all + // 100 pending messages (assuming this message is the last one) before + // trying to accept a new message. + // Ideally the `wait` should be canceled immediately when the number of + // pending messages falls below 100. That said, wait on the whole queue, + // but not wait on one message. + }).wait(); }); _server.set_socket_init_handler( [this]( websocketpp::connection_hdl hdl, typename websocketpp::server::connection_type::socket_type& s ) { - boost::asio::ip::tcp::no_delay option(true); - s.lowest_layer().set_option(option); + boost::asio::ip::tcp::no_delay option(true); + s.lowest_layer().set_option(option); } ); _server.set_http_handler( [this]( connection_hdl hdl ){ - _server_thread.async( [this,hdl](){ - auto con = _server.get_con_from_hdl(hdl); - auto current_con = std::make_shared::connection_ptr>>( con, _forward_header_key ); - _on_connection( current_con ); + _server_thread.async( [this,hdl](){ + auto con = _server.get_con_from_hdl(hdl); + auto current_con = std::make_shared::connection_ptr>>( con, _forward_header_key ); + _on_connection( current_con ); - con->defer_http_response(); // Note: this can tie up resources if send_http_response() is not - // called quickly enough - std::string remote_endpoint = current_con->get_remote_endpoint_string(); - std::string request_body = con->get_request_body(); - wlog( "[HTTP-IN] ${remote_endpoint} ${msg}", - ("remote_endpoint",remote_endpoint) ("msg",request_body) ); + con->defer_http_response(); // Note: this can tie up resources if send_http_response() is not + // called quickly enough + std::string remote_endpoint = current_con->get_remote_endpoint_string(); + std::string request_body = con->get_request_body(); + wlog( "[HTTP-IN] ${remote_endpoint} ${msg}", + ("remote_endpoint",remote_endpoint) ("msg",request_body) ); - fc::async([current_con, request_body, con, remote_endpoint] { - fc::http::reply response = current_con->on_http(request_body); - ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}", - ("remote_endpoint",remote_endpoint) - ("status",response.status) - ("msg",response.body_as_string) ); - con->set_body( std::move( response.body_as_string ) ); - con->set_status( websocketpp::http::status_code::value(response.status) ); - con->send_http_response(); - current_con->closed(); - }, "call on_http"); - }).wait(); + fc::async([current_con, request_body, con, remote_endpoint] { + fc::http::reply response = current_con->on_http(request_body); + ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}", + ("remote_endpoint",remote_endpoint) + ("status",response.status) + ("msg",response.body_as_string) ); + con->set_body( std::move( response.body_as_string ) ); + con->set_status( websocketpp::http::status_code::value(response.status) ); + con->send_http_response(); + current_con->closed(); + }, "call on_http"); + }).wait(); }); _server.set_close_handler( [this]( connection_hdl hdl ){ - _server_thread.async( [this,hdl](){ - if( _connections.find(hdl) != _connections.end() ) - { - _connections[hdl]->closed(); - _connections.erase( hdl ); - } - else - { - wlog( "unknown connection closed" ); - } - if( _connections.empty() && _closed ) - _closed->set_value(); - }).wait(); + _server_thread.async( [this,hdl](){ + if( _connections.find(hdl) != _connections.end() ) + { + _connections[hdl]->closed(); + _connections.erase( hdl ); + } + else + { + wlog( "unknown connection closed" ); + } + if( _connections.empty() && _closed ) + _closed->set_value(); + }).wait(); }); _server.set_fail_handler( [this]( connection_hdl hdl ){ - if( _server.is_listening() ) - { - _server_thread.async( [this,hdl](){ - if( _connections.find(hdl) != _connections.end() ) - { - _connections[hdl]->closed(); - _connections.erase( hdl ); - } - else - { - wlog( "unknown connection failed" ); - } - if( _connections.empty() && _closed ) - _closed->set_value(); - }).wait(); - } + if( _server.is_listening() ) + { + _server_thread.async( [this,hdl](){ + if( _connections.find(hdl) != _connections.end() ) + { + _connections[hdl]->closed(); + _connections.erase( hdl ); + } + else + { + wlog( "unknown connection failed" ); + } + if( _connections.empty() && _closed ) + _closed->set_value(); + }).wait(); + } }); } @@ -628,29 +628,29 @@ namespace fc { namespace http { } void websocket_server::listen( const fc::ip::endpoint& ep ) { - my->_server.listen( boost::asio::ip::tcp::endpoint( - boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); + my->_server.listen( boost::asio::ip::tcp::endpoint( + boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); } uint16_t websocket_server::get_listening_port() { - websocketpp::lib::asio::error_code ec; - return my->_server.get_local_endpoint(ec).port(); + websocketpp::lib::asio::error_code ec; + return my->_server.get_local_endpoint(ec).port(); } void websocket_server::start_accept() { - my->_server.start_accept(); + my->_server.start_accept(); } void websocket_server::stop_listening() { - my->_server.stop_listening(); + my->_server.stop_listening(); } void websocket_server::close() { - for (auto& connection : my->_connections) - my->_server.close(connection.first, websocketpp::close::status::normal, "Goodbye"); + for( auto& connection : my->_connections ) + my->_server.close( connection.first, websocketpp::close::status::normal, "Goodbye" ); } websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password, @@ -708,15 +708,15 @@ namespace fc { namespace http { void websocket_client::close() { - if (my->_hdl) - my->_client.close(*my->_hdl, websocketpp::close::status::normal, "Goodbye"); + if( my->_hdl ) + my->_client.close( *my->_hdl, websocketpp::close::status::normal, "Goodbye" ); } void websocket_client::synchronous_close() { - close(); - if (my->_closed) - my->_closed->wait(); + close(); + if( my->_closed ) + my->_closed->wait(); } void websocket_client::append_header(const std::string& key, const std::string& value) From aa671d61b59f09e579a2664de1f18b7f61f46b31 Mon Sep 17 00:00:00 2001 From: abitmore Date: Sat, 20 Jun 2020 11:36:21 -0400 Subject: [PATCH 8/9] Add promise to wait for closure of server socket This may fix the issues that a websocket server may crash or hang when quitting --- src/network/http/websocket.cpp | 79 +++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index c970152..66d0f4a 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -322,66 +322,82 @@ namespace fc { namespace http { { _connections[hdl]->closed(); _connections.erase( hdl ); + if( _connections.empty() && _all_connections_closed ) + _all_connections_closed->set_value(); } else { wlog( "unknown connection closed" ); } - if( _connections.empty() && _closed ) - _closed->set_value(); }).wait(); }); _server.set_fail_handler( [this]( connection_hdl hdl ){ - if( _server.is_listening() ) - { - _server_thread.async( [this,hdl](){ - if( _connections.find(hdl) != _connections.end() ) - { - _connections[hdl]->closed(); - _connections.erase( hdl ); - } + _server_thread.async( [this,hdl](){ + if( _connections.find(hdl) != _connections.end() ) + { + _connections[hdl]->closed(); + _connections.erase( hdl ); + if( _connections.empty() && _all_connections_closed ) + _all_connections_closed->set_value(); + } + else + { + // if the server is shutting down, assume this hdl is the server socket + if( _server_socket_closed ) + _server_socket_closed->set_value(); else - { wlog( "unknown connection failed" ); - } - if( _connections.empty() && _closed ) - _closed->set_value(); - }).wait(); - } + } + }).wait(); }); } virtual ~generic_websocket_server_impl() { if( _server.is_listening() ) + { + // _server.stop_listening() may trigger the on_fail callback function (the lambda function set by + // _server.set_fail_handler(...) ) for the listening server socket (note: the connection handle + // associated with the server socket is not in our connection map), + // the on_fail callback function may fire (async) a new task which may run really late + // and it will try to access the member variables of this server object, + // so we need to wait for it before destructing this object. + _server_socket_closed = promise::create(); _server.stop_listening(); + } // Note: since _connections can be modified by lambda functions in set_*_handler, which are running - // in another thread, perhaps we need to wait for them (especially the one in set_open_handler) - // being processed. Otherwise `_closed.wait()` may hang. - if( _connections.size() ) + // in other tasks, perhaps we need to wait for them (especially the one in set_open_handler) + // being processed. Otherwise `_all_connections_closed.wait()` may hang. + if( !_connections.empty() ) { - _closed = promise::create(); + _all_connections_closed = promise::create(); auto cpy_con = _connections; + websocketpp::lib::error_code ec; for( auto& item : cpy_con ) - _server.close( item.first, 0, "server exit" ); + _server.close( item.first, 0, "server exit", ec ); - _closed->wait(); + _all_connections_closed->wait(); } + + if( _server_socket_closed ) + _server_socket_closed->wait(); } typedef std::map > con_map; - con_map _connections; // Note: std::map is not thread-safe nor task-safe, we may need - // to use a mutex or similar to avoid concurrent access. - fc::thread& _server_thread; - websocketpp::server _server; - on_connection_handler _on_connection; - fc::promise::ptr _closed; - uint32_t _pending_messages = 0; - std::string _forward_header_key; // A header like "X-Forwarded-For" (XFF), with data IP:port + // Note: std::map is not thread-safe nor task-safe, we may need + // to use a mutex or similar to avoid concurrent access. + con_map _connections; ///< Holds accepted connections + fc::thread& _server_thread; ///< The thread that runs the server + websocketpp::server _server; ///< The server + on_connection_handler _on_connection; ///< A handler to be called when a new connection is accepted + fc::promise::ptr _all_connections_closed; ///< Promise to wait for all connections to be closed + fc::promise::ptr _server_socket_closed; ///< Promise to wait for the server socket to be closed + uint32_t _pending_messages = 0; ///< Number of messages not processed, for rate limiting + std::string _forward_header_key; ///< A header like "X-Forwarded-For" (XFF) with data IP:port }; class websocket_server_impl : public generic_websocket_server_impl @@ -649,8 +665,9 @@ namespace fc { namespace http { void websocket_server::close() { + websocketpp::lib::error_code ec; for( auto& connection : my->_connections ) - my->_server.close( connection.first, websocketpp::close::status::normal, "Goodbye" ); + my->_server.close( connection.first, websocketpp::close::status::normal, "Goodbye", ec ); } websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password, From 0d9b127e3420ea1e7828f26fba3d54d23bec0b91 Mon Sep 17 00:00:00 2001 From: abitmore Date: Sat, 20 Jun 2020 13:01:22 -0400 Subject: [PATCH 9/9] Add missing functions to websocket_tls_server These functions were in websocket_server class but not in websocket_tls_server class: - get_listening_port - stop_listening - close --- include/fc/network/http/websocket.hpp | 9 +++++++++ src/network/http/websocket.cpp | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/include/fc/network/http/websocket.hpp b/include/fc/network/http/websocket.hpp index 5816a57..4894055 100644 --- a/include/fc/network/http/websocket.hpp +++ b/include/fc/network/http/websocket.hpp @@ -48,6 +48,8 @@ namespace fc { namespace http { typedef std::function on_connection_handler; + // TODO websocket_tls_server and websocket_server have almost the same interface and implementation, + // better refactor to remove duplicate code and to avoid undesired or unnecessary differences class websocket_server { public: @@ -69,6 +71,8 @@ namespace fc { namespace http { }; + // TODO websocket_tls_server and websocket_server have almost the same interface and implementation, + // better refactor to remove duplicate code and to avoid undesired or unnecessary differences class websocket_tls_server { public: @@ -80,7 +84,12 @@ namespace fc { namespace http { void on_connection( const on_connection_handler& handler); void listen( uint16_t port ); void listen( const fc::ip::endpoint& ep ); + uint16_t get_listening_port(); void start_accept(); + + void stop_listening(); + void close(); + private: friend class detail::websocket_tls_server_impl; std::unique_ptr my; diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index 66d0f4a..2819967 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -692,10 +692,28 @@ namespace fc { namespace http { boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); } + uint16_t websocket_tls_server::get_listening_port() + { + websocketpp::lib::asio::error_code ec; + return my->_server.get_local_endpoint(ec).port(); + } + void websocket_tls_server::start_accept() { my->_server.start_accept(); } + void websocket_tls_server::stop_listening() + { + my->_server.stop_listening(); + } + + void websocket_tls_server::close() + { + websocketpp::lib::error_code ec; + for( auto& connection : my->_connections ) + my->_server.close( connection.first, websocketpp::close::status::normal, "Goodbye", ec ); + } + websocket_client::websocket_client( const std::string& ca_filename ) :my( new detail::websocket_client_impl() ),