diff --git a/include/fc/network/http/websocket.hpp b/include/fc/network/http/websocket.hpp index 1cb6417..c0c7266 100644 --- a/include/fc/network/http/websocket.hpp +++ b/include/fc/network/http/websocket.hpp @@ -8,7 +8,8 @@ namespace fc { namespace http { namespace detail { - class abstract_websocket_server; + class websocket_server_impl; + class websocket_tls_server_impl; class websocket_client_impl; class websocket_tls_client_impl; } // namespace detail; @@ -41,7 +42,7 @@ namespace fc { namespace http { class websocket_server { public: - websocket_server(bool enable_permessage_deflate = true); + websocket_server(); ~websocket_server(); void on_connection( const on_connection_handler& handler); @@ -50,16 +51,16 @@ namespace fc { namespace http { void start_accept(); private: - std::unique_ptr my; + friend class detail::websocket_server_impl; + std::unique_ptr my; }; class websocket_tls_server { public: - websocket_tls_server(const std::string& server_pem = std::string(), - const std::string& ssl_password = std::string(), - bool enable_permessage_deflate = true); + websocket_tls_server( const std::string& server_pem = std::string(), + const std::string& ssl_password = std::string()); ~websocket_tls_server(); void on_connection( const on_connection_handler& handler); @@ -68,7 +69,8 @@ namespace fc { namespace http { void start_accept(); private: - std::unique_ptr my; + friend class detail::websocket_tls_server_impl; + std::unique_ptr my; }; class websocket_client diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index 81a7cad..c75f0e8 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -20,11 +20,12 @@ namespace fc { namespace http { namespace detail { + struct asio_with_stub_log : public websocketpp::config::asio { + typedef asio_with_stub_log type; typedef asio base; - //// All boilerplate copying the base class's config, except as noted typedef base::concurrency_type concurrency_type; typedef base::request_type request_type; @@ -33,8 +34,15 @@ namespace fc { namespace http { typedef base::message_type message_type; typedef base::con_msg_manager_type con_msg_manager_type; typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; - - /// Custom Logging policies, use do-nothing log::stub instead of log::basic + + /// Custom Logging policies + /*typedef websocketpp::log::syslog elog_type; + typedef websocketpp::log::syslog alog_type; + */ + //typedef base::alog_type alog_type; + //typedef base::elog_type elog_type; typedef websocketpp::log::stub elog_type; typedef websocketpp::log::stub alog_type; @@ -53,124 +61,60 @@ namespace fc { namespace http { typedef websocketpp::transport::asio::endpoint transport_type; - // override default value of 5 sec timeout static const long timeout_open_handshake = 0; - }; - - struct asio_with_stub_log_and_deflate : public websocketpp::config::asio { - typedef asio_with_stub_log_and_deflate type; - typedef asio base; - //// All boilerplate copying the base class's config, except as noted - typedef base::concurrency_type concurrency_type; - - typedef base::request_type request_type; - typedef base::response_type response_type; - - typedef base::message_type message_type; - typedef base::con_msg_manager_type con_msg_manager_type; - typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; - - /// Custom Logging policies, use do-nothing log::stub instead of log::basic - typedef websocketpp::log::stub elog_type; - typedef websocketpp::log::stub alog_type; - - typedef base::rng_type rng_type; - - struct transport_config : public base::transport_config { - typedef type::concurrency_type concurrency_type; - typedef type::alog_type alog_type; - typedef type::elog_type elog_type; - typedef type::request_type request_type; - typedef type::response_type response_type; - typedef websocketpp::transport::asio::basic_socket::endpoint - socket_type; - }; - - typedef websocketpp::transport::asio::endpoint - transport_type; - - /// enable the permessage_compress extension + /// permessage_compress extension struct permessage_deflate_config {}; + typedef websocketpp::extensions::permessage_deflate::enabled permessage_deflate_type; - - // override default value of 5 sec timeout - static const long timeout_open_handshake = 0; }; - struct asio_tls_stub_log : public websocketpp::config::asio_tls { - typedef asio_tls_stub_log type; - typedef asio_tls base; + typedef asio_tls_stub_log type; + typedef asio_tls base; - //// All boilerplate copying the base class's config, except as noted - typedef base::concurrency_type concurrency_type; + typedef base::concurrency_type concurrency_type; - typedef base::request_type request_type; - typedef base::response_type response_type; + typedef base::request_type request_type; + typedef base::response_type response_type; - typedef base::message_type message_type; - typedef base::con_msg_manager_type con_msg_manager_type; - typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; + typedef base::message_type message_type; + typedef base::con_msg_manager_type con_msg_manager_type; + typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; - /// Custom Logging policies, use do-nothing log::stub instead of log::basic - typedef websocketpp::log::stub elog_type; - typedef websocketpp::log::stub alog_type; + //typedef base::alog_type alog_type; + //typedef base::elog_type elog_type; + typedef websocketpp::log::stub elog_type; + typedef websocketpp::log::stub alog_type; - typedef base::rng_type rng_type; + typedef base::rng_type rng_type; - struct transport_config : public base::transport_config { - typedef type::concurrency_type concurrency_type; - typedef type::alog_type alog_type; - typedef type::elog_type elog_type; - typedef type::request_type request_type; - typedef type::response_type response_type; - typedef websocketpp::transport::asio::tls_socket::endpoint socket_type; - }; + struct transport_config : public base::transport_config { + typedef type::concurrency_type concurrency_type; + typedef type::alog_type alog_type; + typedef type::elog_type elog_type; + typedef type::request_type request_type; + typedef type::response_type response_type; + typedef websocketpp::transport::asio::tls_socket::endpoint socket_type; + }; - typedef websocketpp::transport::asio::endpoint - transport_type; + typedef websocketpp::transport::asio::endpoint + transport_type; + + /// permessage_compress extension + struct permessage_deflate_config {}; + + typedef websocketpp::extensions::permessage_deflate::enabled + permessage_deflate_type; }; - struct asio_tls_stub_log_and_deflate : public websocketpp::config::asio_tls { - typedef asio_tls_stub_log_and_deflate type; - typedef asio_tls base; - //// All boilerplate copying the base class's config, except as noted - typedef base::concurrency_type concurrency_type; - typedef base::request_type request_type; - typedef base::response_type response_type; - typedef base::message_type message_type; - typedef base::con_msg_manager_type con_msg_manager_type; - typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; - - /// Custom Logging policies, use do-nothing log::stub instead of log::basic - typedef websocketpp::log::stub elog_type; - typedef websocketpp::log::stub alog_type; - - typedef base::rng_type rng_type; - - struct transport_config : public base::transport_config { - typedef type::concurrency_type concurrency_type; - typedef type::alog_type alog_type; - typedef type::elog_type elog_type; - typedef type::request_type request_type; - typedef type::response_type response_type; - typedef websocketpp::transport::asio::tls_socket::endpoint socket_type; - }; - - typedef websocketpp::transport::asio::endpoint - transport_type; - - /// enable the permessage_compress extension - struct permessage_deflate_config {}; - typedef websocketpp::extensions::permessage_deflate::enabled - permessage_deflate_type; - }; using websocketpp::connection_hdl; + typedef websocketpp::server websocket_server_type; + typedef websocketpp::server websocket_tls_server_type; template class websocket_connection_impl : public websocket_connection @@ -201,19 +145,7 @@ namespace fc { namespace http { typedef websocketpp::lib::shared_ptr context_ptr; - class abstract_websocket_server - { - public: - virtual ~abstract_websocket_server() {} - - virtual void on_connection( const on_connection_handler& handler) = 0; - virtual void listen( uint16_t port ) = 0; - virtual void listen( const fc::ip::endpoint& ep ) = 0; - virtual void start_accept() = 0; - }; - - template - class websocket_server_impl : public abstract_websocket_server + class websocket_server_impl { public: websocket_server_impl() @@ -225,15 +157,15 @@ namespace fc { namespace http { _server.set_reuse_addr(true); _server.set_open_handler( [&]( connection_hdl hdl ){ _server_thread.async( [&](){ - websocket_connection_ptr new_con = std::make_shared::connection_ptr>>( _server.get_con_from_hdl(hdl) ); + auto new_con = std::make_shared>( _server.get_con_from_hdl(hdl) ); _on_connection( _connections[hdl] = new_con ); }).wait(); }); - _server.set_message_handler( [&]( connection_hdl hdl, typename websocketpp::server::message_ptr msg ){ + _server.set_message_handler( [&]( connection_hdl hdl, websocket_server_type::message_ptr msg ){ _server_thread.async( [&](){ auto current_con = _connections.find(hdl); assert( current_con != _connections.end() ); - //wdump(("server")(msg->get_payload())); + wdump(("server")(msg->get_payload())); //std::cerr<<"recv: "<get_payload()<<"\n"; auto payload = msg->get_payload(); std::shared_ptr con = current_con->second; @@ -246,13 +178,13 @@ namespace fc { namespace http { _server.set_http_handler( [&]( connection_hdl hdl ){ _server_thread.async( [&](){ - auto current_con = std::make_shared::connection_ptr>>( _server.get_con_from_hdl(hdl) ); + auto current_con = std::make_shared>( _server.get_con_from_hdl(hdl) ); _on_connection( current_con ); auto con = _server.get_con_from_hdl(hdl); con->defer_http_response(); std::string request_body = con->get_request_body(); - //wdump(("server")(request_body)); + wdump(("server")(request_body)); fc::async([current_con, request_body, con] { std::string response = current_con->on_http(request_body); @@ -314,62 +246,132 @@ namespace fc { namespace http { if( _closed ) _closed->wait(); } - void on_connection( const on_connection_handler& handler ) override - { - _on_connection = handler; - } - - void listen( uint16_t port ) override - { - _server.listen(port); - } - - void listen( const fc::ip::endpoint& ep ) override - { - _server.listen( boost::asio::ip::tcp::endpoint( boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); - } - - void start_accept() override - { - _server.start_accept(); - } - typedef std::map > con_map; con_map _connections; fc::thread& _server_thread; - websocketpp::server _server; + websocket_server_type _server; on_connection_handler _on_connection; fc::promise::ptr _closed; uint32_t _pending_messages = 0; }; - template - class websocket_tls_server_impl : public websocket_server_impl + class websocket_tls_server_impl { public: websocket_tls_server_impl( const string& server_pem, const string& ssl_password ) + :_server_thread( fc::thread::current() ) { - this->_server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr { - context_ptr ctx = websocketpp::lib::make_shared(boost::asio::ssl::context::tlsv1); - try { - ctx->set_options(boost::asio::ssl::context::default_workarounds | - boost::asio::ssl::context::no_sslv2 | - boost::asio::ssl::context::no_sslv3 | - boost::asio::ssl::context::single_dh_use); - ctx->set_password_callback([=](std::size_t max_length, boost::asio::ssl::context::password_purpose){ return ssl_password;}); - ctx->use_certificate_chain_file(server_pem); - ctx->use_private_key_file(server_pem, boost::asio::ssl::context::pem); - } catch (std::exception& e) { - std::cout << e.what() << std::endl; - } - return ctx; + //if( server_pem.size() ) + { + _server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr { + context_ptr ctx = websocketpp::lib::make_shared(boost::asio::ssl::context::tlsv1); + try { + ctx->set_options(boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::no_sslv2 | + boost::asio::ssl::context::no_sslv3 | + boost::asio::ssl::context::single_dh_use); + ctx->set_password_callback([=](std::size_t max_length, boost::asio::ssl::context::password_purpose){ return ssl_password;}); + ctx->use_certificate_chain_file(server_pem); + ctx->use_private_key_file(server_pem, boost::asio::ssl::context::pem); + } catch (std::exception& e) { + std::cout << e.what() << std::endl; + } + return ctx; + }); + } + + _server.clear_access_channels( websocketpp::log::alevel::all ); + _server.init_asio(&fc::asio::default_io_service()); + _server.set_reuse_addr(true); + _server.set_open_handler( [&]( connection_hdl hdl ){ + _server_thread.async( [&](){ + auto new_con = std::make_shared>( _server.get_con_from_hdl(hdl) ); + _on_connection( _connections[hdl] = new_con ); + }).wait(); + }); + _server.set_message_handler( [&]( connection_hdl hdl, websocket_server_type::message_ptr msg ){ + _server_thread.async( [&](){ + auto current_con = _connections.find(hdl); + assert( current_con != _connections.end() ); + auto received = msg->get_payload(); + std::shared_ptr con = current_con->second; + fc::async([con,received](){ con->on_message( received ); }); + }).wait(); + }); + + _server.set_http_handler( [&]( connection_hdl hdl ){ + _server_thread.async( [&](){ + + auto current_con = std::make_shared>( _server.get_con_from_hdl(hdl) ); + try{ + _on_connection( current_con ); + + auto con = _server.get_con_from_hdl(hdl); + wdump(("server")(con->get_request_body())); + auto response = current_con->on_http( con->get_request_body() ); + + con->set_body( response ); + con->set_status( websocketpp::http::status_code::ok ); + } catch ( const fc::exception& e ) + { + edump((e.to_detail_string())); + } + current_con->closed(); + + }).wait(); + }); + + _server.set_close_handler( [&]( connection_hdl hdl ){ + _server_thread.async( [&](){ + _connections[hdl]->closed(); + _connections.erase( hdl ); + }).wait(); + }); + + _server.set_fail_handler( [&]( connection_hdl hdl ){ + if( _server.is_listening() ) + { + _server_thread.async( [&](){ + if( _connections.find(hdl) != _connections.end() ) + { + _connections[hdl]->closed(); + _connections.erase( hdl ); + } + }).wait(); + } }); } + ~websocket_tls_server_impl() + { + if( _server.is_listening() ) + _server.stop_listening(); + auto cpy_con = _connections; + for( auto item : cpy_con ) + _server.close( item.first, 0, "server exit" ); + } + + typedef std::map > con_map; + + con_map _connections; + fc::thread& _server_thread; + websocket_tls_server_type _server; + on_connection_handler _on_connection; + fc::promise::ptr _closed; }; + + + + + + + + + + typedef websocketpp::client websocket_client_type; typedef websocketpp::client websocket_tls_client_type; @@ -512,63 +514,57 @@ namespace fc { namespace http { } // namespace detail - websocket_server::websocket_server(bool enable_permessage_deflate /* = true */) : - my( enable_permessage_deflate ? - (detail::abstract_websocket_server*)new detail::websocket_server_impl : - (detail::abstract_websocket_server*)new detail::websocket_server_impl ) - {} + websocket_server::websocket_server():my( new detail::websocket_server_impl() ) {} websocket_server::~websocket_server(){} void websocket_server::on_connection( const on_connection_handler& handler ) { - my->on_connection(handler); + my->_on_connection = handler; } void websocket_server::listen( uint16_t port ) { - my->listen(port); + my->_server.listen(port); } void websocket_server::listen( const fc::ip::endpoint& ep ) { - my->listen(ep); + my->_server.listen( boost::asio::ip::tcp::endpoint( boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); } void websocket_server::start_accept() { - my->start_accept(); + my->_server.start_accept(); } - websocket_tls_server::websocket_tls_server(const string& server_pem, - const string& ssl_password, - bool enable_permessage_deflate /* = true */) : - my( enable_permessage_deflate ? - (detail::abstract_websocket_server*)new detail::websocket_tls_server_impl(server_pem, ssl_password) : - (detail::abstract_websocket_server*)new detail::websocket_tls_server_impl(server_pem, ssl_password) ) - {} + websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password ):my( new detail::websocket_tls_server_impl(server_pem, ssl_password) ) {} websocket_tls_server::~websocket_tls_server(){} void websocket_tls_server::on_connection( const on_connection_handler& handler ) { - my->on_connection(handler); + my->_on_connection = handler; } void websocket_tls_server::listen( uint16_t port ) { - my->listen(port); + my->_server.listen(port); } void websocket_tls_server::listen( const fc::ip::endpoint& ep ) { - my->listen(ep); + my->_server.listen( boost::asio::ip::tcp::endpoint( boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); } - void websocket_tls_server::start_accept() - { - my->start_accept(); + void websocket_tls_server::start_accept() { + my->_server.start_accept(); } + websocket_tls_client::websocket_tls_client():my( new detail::websocket_tls_client_impl() ) {} + websocket_tls_client::~websocket_tls_client(){ } + + + websocket_client::websocket_client():my( new detail::websocket_client_impl() ),smy(new detail::websocket_tls_client_impl()) {} websocket_client::~websocket_client(){ }