diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..cccd56b --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "vendor/websocketpp"] + path = vendor/websocketpp + url = https://github.com/zaphoyd/websocketpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f91dc0..6bce98e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -84,7 +84,7 @@ IF(NOT "$ENV{OPENSSL_ROOT_DIR}" STREQUAL "") ENDIF() -find_package(OpenSSL) +find_package(OpenSSL REQUIRED) set( CMAKE_FIND_LIBRARY_SUFFIXES ${ORIGINAL_LIB_SUFFIXES} ) @@ -157,6 +157,7 @@ set( fc_sources src/network/udt_socket.cpp src/network/http/http_connection.cpp src/network/http/http_server.cpp + src/network/http/websocket.cpp src/network/ntp.cpp src/network/ip.cpp src/network/rate_limiting.cpp @@ -188,6 +189,7 @@ list(APPEND sources "${CMAKE_CURRENT_BINARY_DIR}/git_revision.cpp") list(APPEND sources ${fc_headers}) add_subdirectory( vendor/easylzma ) +add_subdirectory( vendor/websocketpp ) #add_subdirectory( vendor/scrypt-jane ) add_subdirectory( vendor/udt4 ) @@ -227,6 +229,7 @@ target_include_directories(fc ${CMAKE_CURRENT_SOURCE_DIR}/vendor/salsa20 ${CMAKE_CURRENT_SOURCE_DIR}/vendor/easylzma/src ${CMAKE_CURRENT_SOURCE_DIR}/vendor/udt4/src + ${CMAKE_CURRENT_SOURCE_DIR}/vendor/websocketpp ) #target_link_libraries( fc PUBLIC easylzma_static scrypt udt ${Boost_LIBRARIES} ${OPENSSL_LIBRARIES} ${ZLIB_LIBRARIES} ${PLATFORM_SPECIFIC_LIBS} ${RPCRT4} ${CMAKE_DL_LIBS} ${rt_library}) @@ -235,6 +238,10 @@ target_link_libraries( fc PUBLIC easylzma_static udt ${Boost_LIBRARIES} ${OPENSS add_executable( api tests/api.cpp ) target_link_libraries( api fc ) +include_directories( vendor/websocketpp ) +add_executable( websockettest tests/websocket.cpp ) +target_link_libraries( websockettest fc ) + add_executable( ntp_test ntp_test.cpp ) target_link_libraries( ntp_test fc ) @@ -245,7 +252,6 @@ target_link_libraries( task_cancel_test fc ) add_executable( real128_test tests/real128_test.cpp ) target_link_libraries( real128_test fc ) -#include_directories( vendor/udt4/src ) add_executable( udt_server tests/udts.cpp ) target_link_libraries( udt_server fc udt ) diff --git a/include/fc/network/http/websocket.hpp b/include/fc/network/http/websocket.hpp new file mode 100644 index 0000000..19eca30 --- /dev/null +++ b/include/fc/network/http/websocket.hpp @@ -0,0 +1,63 @@ +#pragma once +#include +#include +#include + +namespace fc { namespace http { + namespace detail { + class websocket_server_impl; + class websocket_client_impl; + } // namespace detail; + + class websocket_connection + { + public: + virtual ~websocket_connection(){}; + virtual void send_message( const std::string& message ) = 0; + }; + typedef std::shared_ptr websocket_connection_ptr; + + class websocket_session + { + public: + websocket_session( const websocket_connection_ptr& con ) + :_connection(con){} + + virtual ~websocket_session(){}; + virtual void on_message( const std::string& message ) = 0; + + void send_message( const std::string& message ) { _connection->send_message(message); } + private: + websocket_connection_ptr _connection; + }; + typedef std::shared_ptr websocket_session_ptr; + + typedef std::function< websocket_session_ptr( const websocket_connection_ptr& ) > session_factory; + + class websocket_server + { + public: + websocket_server(); + ~websocket_server(); + + void on_connection( const session_factory& factory ); + void listen( uint16_t port ); + void start_accept(); + + private: + friend class detail::websocket_server_impl; + std::unique_ptr my; + }; + + class websocket_client + { + public: + websocket_client(); + ~websocket_client(); + + websocket_session_ptr connect( const std::string& uri, const session_factory& ); + private: + std::unique_ptr my; + }; + +} } diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp new file mode 100644 index 0000000..bef4baa --- /dev/null +++ b/src/network/http/websocket.cpp @@ -0,0 +1,164 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace fc { namespace http { + + namespace detail { + using websocketpp::connection_hdl; + typedef websocketpp::server websocket_server_type; + + template + class websocket_connection_impl : public websocket_connection + { + public: + websocket_connection_impl( T con ) + :_ws_connection(con){} + + virtual void send_message( const std::string& message ) + { + _ws_connection->send( message ); + } + + T _ws_connection; + }; + + + class websocket_server_impl + { + public: + websocket_server_impl() + :_server_thread( fc::thread::current() ) + { + _server.init_asio(&fc::asio::default_io_service()); + _server.set_reuse_addr(true); + _server.set_open_handler( [&]( connection_hdl hdl ){ + _server_thread.async( [&](){ + wlog( "on open server" ); + auto new_con = std::make_shared>( _server.get_con_from_hdl(hdl) ); + _connections[hdl] = _factory( new_con ); + }).wait(); + }); + _server.set_message_handler( [&]( connection_hdl hdl, websocket_server_type::message_ptr msg ){ + _server_thread.async( [&](){ + auto current_con = _connections.find(hdl); + assert( current_con != _connections.end() ); + wdump(("server")(msg->get_payload())); + current_con->second->on_message( msg->get_payload() ); + }).wait(); + }); + _server.set_close_handler( [&]( connection_hdl hdl ){ + _server_thread.async( [&](){ + _connections.erase( hdl ); + }).wait(); + }); + + _server.set_fail_handler( [&]( connection_hdl hdl ){ + _server_thread.async( [&](){ + _connections.erase( hdl ); + }).wait(); + }); + } + + typedef std::map > con_map; + + con_map _connections; + fc::thread& _server_thread; + websocket_server_type _server; + session_factory _factory; + }; + + typedef websocketpp::client websocket_client_type; + typedef websocket_client_type::connection_ptr websocket_client_connection_type; + + class websocket_client_impl + { + public: + typedef websocket_client_type::message_ptr message_ptr; + + websocket_client_impl() + :_client_thread( fc::thread::current() ) + { + _client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ + elog( "default open client" ); + }); + _client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){ + wlog("start wait"); + _client_thread.async( [&](){ + wdump((msg->get_payload())); + _session->on_message( msg->get_payload() ); + }).wait(); + wlog("done wait"); + }); + _client.set_close_handler( [=]( connection_hdl hdl ){ + wlog("start wait"); + _client_thread.async( [&](){ _session.reset(); } ).wait(); + wlog("done wait"); + }); + _client.set_fail_handler( [=]( connection_hdl hdl ){ + wlog("start wait"); + _client_thread.async( [&](){ _session.reset(); } ).wait(); + wlog("done wait"); + }); + _client.init_asio( &fc::asio::default_io_service() ); + } + fc::promise::ptr _connected; + fc::thread& _client_thread; + websocket_client_type _client; + websocket_session_ptr _session; + websocket_connection_ptr _connection; + }; + } // namespace detail + + websocket_server::websocket_server():my( new detail::websocket_server_impl() ) {} + websocket_server::~websocket_server(){} + + void websocket_server::on_connection( const session_factory& factory ) + { + my->_factory = factory; + } + + void websocket_server::listen( uint16_t port ) + { + my->_server.listen(port); + } + + void websocket_server::start_accept() { + my->_server.start_accept(); + } + + websocket_client::websocket_client():my( new detail::websocket_client_impl() ) {} + websocket_client::~websocket_client(){} + websocket_session_ptr websocket_client::connect( const std::string& uri, const session_factory& factory ) + { try { + wlog( "connecting to ${uri}", ("uri",uri)); + websocketpp::lib::error_code ec; + + my->_connected = fc::promise::ptr( new fc::promise("websocket::connect") ); + + my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ + auto con = my->_client.get_con_from_hdl(hdl); + my->_connection = std::make_shared>( con ); + my->_session = factory( my->_connection ); + my->_connected->set_value(); + }); + + auto con = my->_client.get_connection( uri, ec ); + if( ec ) + { + FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); + } + my->_client.connect(con); + my->_connected->wait(); + return my->_session; + } FC_CAPTURE_AND_RETHROW( (uri) ) } + +} } // fc::http diff --git a/tests/websocket.cpp b/tests/websocket.cpp new file mode 100644 index 0000000..766d699 --- /dev/null +++ b/tests/websocket.cpp @@ -0,0 +1,50 @@ +#include +#include +#include + +using namespace fc::http; + +class echo_session : public fc::http::websocket_session +{ + public: + echo_session( const websocket_connection_ptr c ):fc::http::websocket_session(c){} + void on_message( const std::string& message ) + { + idump((message)); + if( message.size() < 64 ) + send_message( "echo " + message ); + } +}; + + +int main( int argc, char** argv ) +{ + try { + auto create_session = [&]( const websocket_connection_ptr& c ){ + return std::make_shared(c); + }; + fc::http::websocket_server server; + server.on_connection(create_session); + + server.listen( 8090 ); + server.start_accept(); + + fc::http::websocket_client client; + auto session = client.connect( "ws://localhost:8090", create_session ); + wlog( "connected" ); + session->send_message( "hello world" ); + + fc::usleep( fc::seconds(2) ); + return 0; + } + /* + catch ( const websocketpp::lib::error_code& e ) + { + edump( (e.message()) ); + } + */ + catch ( const fc::exception& e ) + { + edump((e.to_detail_string())); + } +} diff --git a/vendor/websocketpp b/vendor/websocketpp new file mode 160000 index 0000000..13f6da6 --- /dev/null +++ b/vendor/websocketpp @@ -0,0 +1 @@ +Subproject commit 13f6da6f81207ae7e67f0e7d25ed0e3cc2ec2f9c