diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index f9e1527..c970152 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -243,112 +243,112 @@ namespace fc { namespace http { { public: generic_websocket_server_impl( const std::string& forward_header_key ) - :_server_thread( fc::thread::current() ), _forward_header_key(forward_header_key) + :_server_thread( fc::thread::current() ), _forward_header_key( forward_header_key ) { _server.clear_access_channels( websocketpp::log::alevel::all ); - _server.init_asio(&fc::asio::default_io_service()); - _server.set_reuse_addr(true); + _server.init_asio( &fc::asio::default_io_service() ); + _server.set_reuse_addr( true ); _server.set_open_handler( [this]( connection_hdl hdl ){ _server_thread.async( [this, hdl](){ - auto new_con = std::make_shared::connection_ptr>>( _server.get_con_from_hdl(hdl), _forward_header_key ); - _on_connection( _connections[hdl] = new_con ); - }).wait(); + _on_connection( _connections[hdl] = new_con ); + }).wait(); }); _server.set_message_handler( [this]( connection_hdl hdl, typename websocketpp::server::message_ptr msg ){ - _server_thread.async( [this,hdl,msg](){ - auto current_con = _connections.find(hdl); - if( current_con == _connections.end() ) - return; - auto payload = msg->get_payload(); - std::shared_ptr con = current_con->second; - wlog( "[IN] ${remote_endpoint} ${msg}", - ("remote_endpoint",con->get_remote_endpoint_string()) ("msg",payload) ); - ++_pending_messages; - auto f = fc::async([this,con,payload](){ - if( _pending_messages ) - --_pending_messages; - con->on_message( payload ); - }); - if( _pending_messages > 100 ) - f.wait(); // Note: this is a bit strange, because it forces the server to process all - // 100 pending messages (assuming this message is the last one) before - // trying to accept a new message. - // Ideally the `wait` should be canceled immediately when the number of - // pending messages falls below 100. That said, wait on the whole queue, - // but not wait on one message. - }).wait(); + _server_thread.async( [this,hdl,msg](){ + auto current_con = _connections.find(hdl); + if( current_con == _connections.end() ) + return; + auto payload = msg->get_payload(); + std::shared_ptr con = current_con->second; + wlog( "[IN] ${remote_endpoint} ${msg}", + ("remote_endpoint",con->get_remote_endpoint_string()) ("msg",payload) ); + ++_pending_messages; + auto f = fc::async([this,con,payload](){ + if( _pending_messages ) + --_pending_messages; + con->on_message( payload ); + }); + if( _pending_messages > 100 ) + f.wait(); // Note: this is a bit strange, because it forces the server to process all + // 100 pending messages (assuming this message is the last one) before + // trying to accept a new message. + // Ideally the `wait` should be canceled immediately when the number of + // pending messages falls below 100. That said, wait on the whole queue, + // but not wait on one message. + }).wait(); }); _server.set_socket_init_handler( [this]( websocketpp::connection_hdl hdl, typename websocketpp::server::connection_type::socket_type& s ) { - boost::asio::ip::tcp::no_delay option(true); - s.lowest_layer().set_option(option); + boost::asio::ip::tcp::no_delay option(true); + s.lowest_layer().set_option(option); } ); _server.set_http_handler( [this]( connection_hdl hdl ){ - _server_thread.async( [this,hdl](){ - auto con = _server.get_con_from_hdl(hdl); - auto current_con = std::make_shared::connection_ptr>>( con, _forward_header_key ); - _on_connection( current_con ); + _server_thread.async( [this,hdl](){ + auto con = _server.get_con_from_hdl(hdl); + auto current_con = std::make_shared::connection_ptr>>( con, _forward_header_key ); + _on_connection( current_con ); - con->defer_http_response(); // Note: this can tie up resources if send_http_response() is not - // called quickly enough - std::string remote_endpoint = current_con->get_remote_endpoint_string(); - std::string request_body = con->get_request_body(); - wlog( "[HTTP-IN] ${remote_endpoint} ${msg}", - ("remote_endpoint",remote_endpoint) ("msg",request_body) ); + con->defer_http_response(); // Note: this can tie up resources if send_http_response() is not + // called quickly enough + std::string remote_endpoint = current_con->get_remote_endpoint_string(); + std::string request_body = con->get_request_body(); + wlog( "[HTTP-IN] ${remote_endpoint} ${msg}", + ("remote_endpoint",remote_endpoint) ("msg",request_body) ); - fc::async([current_con, request_body, con, remote_endpoint] { - fc::http::reply response = current_con->on_http(request_body); - ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}", - ("remote_endpoint",remote_endpoint) - ("status",response.status) - ("msg",response.body_as_string) ); - con->set_body( std::move( response.body_as_string ) ); - con->set_status( websocketpp::http::status_code::value(response.status) ); - con->send_http_response(); - current_con->closed(); - }, "call on_http"); - }).wait(); + fc::async([current_con, request_body, con, remote_endpoint] { + fc::http::reply response = current_con->on_http(request_body); + ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}", + ("remote_endpoint",remote_endpoint) + ("status",response.status) + ("msg",response.body_as_string) ); + con->set_body( std::move( response.body_as_string ) ); + con->set_status( websocketpp::http::status_code::value(response.status) ); + con->send_http_response(); + current_con->closed(); + }, "call on_http"); + }).wait(); }); _server.set_close_handler( [this]( connection_hdl hdl ){ - _server_thread.async( [this,hdl](){ - if( _connections.find(hdl) != _connections.end() ) - { - _connections[hdl]->closed(); - _connections.erase( hdl ); - } - else - { - wlog( "unknown connection closed" ); - } - if( _connections.empty() && _closed ) - _closed->set_value(); - }).wait(); + _server_thread.async( [this,hdl](){ + if( _connections.find(hdl) != _connections.end() ) + { + _connections[hdl]->closed(); + _connections.erase( hdl ); + } + else + { + wlog( "unknown connection closed" ); + } + if( _connections.empty() && _closed ) + _closed->set_value(); + }).wait(); }); _server.set_fail_handler( [this]( connection_hdl hdl ){ - if( _server.is_listening() ) - { - _server_thread.async( [this,hdl](){ - if( _connections.find(hdl) != _connections.end() ) - { - _connections[hdl]->closed(); - _connections.erase( hdl ); - } - else - { - wlog( "unknown connection failed" ); - } - if( _connections.empty() && _closed ) - _closed->set_value(); - }).wait(); - } + if( _server.is_listening() ) + { + _server_thread.async( [this,hdl](){ + if( _connections.find(hdl) != _connections.end() ) + { + _connections[hdl]->closed(); + _connections.erase( hdl ); + } + else + { + wlog( "unknown connection failed" ); + } + if( _connections.empty() && _closed ) + _closed->set_value(); + }).wait(); + } }); } @@ -628,29 +628,29 @@ namespace fc { namespace http { } void websocket_server::listen( const fc::ip::endpoint& ep ) { - my->_server.listen( boost::asio::ip::tcp::endpoint( - boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); + my->_server.listen( boost::asio::ip::tcp::endpoint( + boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); } uint16_t websocket_server::get_listening_port() { - websocketpp::lib::asio::error_code ec; - return my->_server.get_local_endpoint(ec).port(); + websocketpp::lib::asio::error_code ec; + return my->_server.get_local_endpoint(ec).port(); } void websocket_server::start_accept() { - my->_server.start_accept(); + my->_server.start_accept(); } void websocket_server::stop_listening() { - my->_server.stop_listening(); + my->_server.stop_listening(); } void websocket_server::close() { - for (auto& connection : my->_connections) - my->_server.close(connection.first, websocketpp::close::status::normal, "Goodbye"); + for( auto& connection : my->_connections ) + my->_server.close( connection.first, websocketpp::close::status::normal, "Goodbye" ); } websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password, @@ -708,15 +708,15 @@ namespace fc { namespace http { void websocket_client::close() { - if (my->_hdl) - my->_client.close(*my->_hdl, websocketpp::close::status::normal, "Goodbye"); + if( my->_hdl ) + my->_client.close( *my->_hdl, websocketpp::close::status::normal, "Goodbye" ); } void websocket_client::synchronous_close() { - close(); - if (my->_closed) - my->_closed->wait(); + close(); + if( my->_closed ) + my->_closed->wait(); } void websocket_client::append_header(const std::string& key, const std::string& value)