implement simple websocket wrapper ontop of websocketpp

This commit is contained in:
Daniel Larimer 2015-03-26 16:51:10 -04:00
parent 2977ca954f
commit 4ce26f068f
6 changed files with 289 additions and 2 deletions

3
.gitmodules vendored Normal file
View file

@ -0,0 +1,3 @@
[submodule "vendor/websocketpp"]
path = vendor/websocketpp
url = https://github.com/zaphoyd/websocketpp

View file

@ -84,7 +84,7 @@ IF(NOT "$ENV{OPENSSL_ROOT_DIR}" STREQUAL "")
ENDIF()
find_package(OpenSSL)
find_package(OpenSSL REQUIRED)
set( CMAKE_FIND_LIBRARY_SUFFIXES ${ORIGINAL_LIB_SUFFIXES} )
@ -157,6 +157,7 @@ set( fc_sources
src/network/udt_socket.cpp
src/network/http/http_connection.cpp
src/network/http/http_server.cpp
src/network/http/websocket.cpp
src/network/ntp.cpp
src/network/ip.cpp
src/network/rate_limiting.cpp
@ -188,6 +189,7 @@ list(APPEND sources "${CMAKE_CURRENT_BINARY_DIR}/git_revision.cpp")
list(APPEND sources ${fc_headers})
add_subdirectory( vendor/easylzma )
add_subdirectory( vendor/websocketpp )
#add_subdirectory( vendor/scrypt-jane )
add_subdirectory( vendor/udt4 )
@ -227,6 +229,7 @@ target_include_directories(fc
${CMAKE_CURRENT_SOURCE_DIR}/vendor/salsa20
${CMAKE_CURRENT_SOURCE_DIR}/vendor/easylzma/src
${CMAKE_CURRENT_SOURCE_DIR}/vendor/udt4/src
${CMAKE_CURRENT_SOURCE_DIR}/vendor/websocketpp
)
#target_link_libraries( fc PUBLIC easylzma_static scrypt udt ${Boost_LIBRARIES} ${OPENSSL_LIBRARIES} ${ZLIB_LIBRARIES} ${PLATFORM_SPECIFIC_LIBS} ${RPCRT4} ${CMAKE_DL_LIBS} ${rt_library})
@ -235,6 +238,10 @@ target_link_libraries( fc PUBLIC easylzma_static udt ${Boost_LIBRARIES} ${OPENSS
add_executable( api tests/api.cpp )
target_link_libraries( api fc )
include_directories( vendor/websocketpp )
add_executable( websockettest tests/websocket.cpp )
target_link_libraries( websockettest fc )
add_executable( ntp_test ntp_test.cpp )
target_link_libraries( ntp_test fc )
@ -245,7 +252,6 @@ target_link_libraries( task_cancel_test fc )
add_executable( real128_test tests/real128_test.cpp )
target_link_libraries( real128_test fc )
#include_directories( vendor/udt4/src )
add_executable( udt_server tests/udts.cpp )
target_link_libraries( udt_server fc udt )

View file

@ -0,0 +1,63 @@
#pragma once
#include <functional>
#include <memory>
#include <string>
namespace fc { namespace http {
namespace detail {
class websocket_server_impl;
class websocket_client_impl;
} // namespace detail;
class websocket_connection
{
public:
virtual ~websocket_connection(){};
virtual void send_message( const std::string& message ) = 0;
};
typedef std::shared_ptr<websocket_connection> websocket_connection_ptr;
class websocket_session
{
public:
websocket_session( const websocket_connection_ptr& con )
:_connection(con){}
virtual ~websocket_session(){};
virtual void on_message( const std::string& message ) = 0;
void send_message( const std::string& message ) { _connection->send_message(message); }
private:
websocket_connection_ptr _connection;
};
typedef std::shared_ptr<websocket_session> websocket_session_ptr;
typedef std::function< websocket_session_ptr( const websocket_connection_ptr& ) > session_factory;
class websocket_server
{
public:
websocket_server();
~websocket_server();
void on_connection( const session_factory& factory );
void listen( uint16_t port );
void start_accept();
private:
friend class detail::websocket_server_impl;
std::unique_ptr<detail::websocket_server_impl> my;
};
class websocket_client
{
public:
websocket_client();
~websocket_client();
websocket_session_ptr connect( const std::string& uri, const session_factory& );
private:
std::unique_ptr<detail::websocket_client_impl> my;
};
} }

View file

@ -0,0 +1,164 @@
#include <fc/network/http/websocket.hpp>
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/config/asio.hpp>
#include <websocketpp/server.hpp>
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp>
#include <fc/optional.hpp>
#include <fc/variant.hpp>
#include <fc/thread/thread.hpp>
#include <fc/asio.hpp>
namespace fc { namespace http {
namespace detail {
using websocketpp::connection_hdl;
typedef websocketpp::server<websocketpp::config::asio> websocket_server_type;
template<typename T>
class websocket_connection_impl : public websocket_connection
{
public:
websocket_connection_impl( T con )
:_ws_connection(con){}
virtual void send_message( const std::string& message )
{
_ws_connection->send( message );
}
T _ws_connection;
};
class websocket_server_impl
{
public:
websocket_server_impl()
:_server_thread( fc::thread::current() )
{
_server.init_asio(&fc::asio::default_io_service());
_server.set_reuse_addr(true);
_server.set_open_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){
wlog( "on open server" );
auto new_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) );
_connections[hdl] = _factory( new_con );
}).wait();
});
_server.set_message_handler( [&]( connection_hdl hdl, websocket_server_type::message_ptr msg ){
_server_thread.async( [&](){
auto current_con = _connections.find(hdl);
assert( current_con != _connections.end() );
wdump(("server")(msg->get_payload()));
current_con->second->on_message( msg->get_payload() );
}).wait();
});
_server.set_close_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){
_connections.erase( hdl );
}).wait();
});
_server.set_fail_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){
_connections.erase( hdl );
}).wait();
});
}
typedef std::map<connection_hdl, websocket_session_ptr,std::owner_less<connection_hdl> > con_map;
con_map _connections;
fc::thread& _server_thread;
websocket_server_type _server;
session_factory _factory;
};
typedef websocketpp::client<websocketpp::config::asio_client> websocket_client_type;
typedef websocket_client_type::connection_ptr websocket_client_connection_type;
class websocket_client_impl
{
public:
typedef websocket_client_type::message_ptr message_ptr;
websocket_client_impl()
:_client_thread( fc::thread::current() )
{
_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
elog( "default open client" );
});
_client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){
wlog("start wait");
_client_thread.async( [&](){
wdump((msg->get_payload()));
_session->on_message( msg->get_payload() );
}).wait();
wlog("done wait");
});
_client.set_close_handler( [=]( connection_hdl hdl ){
wlog("start wait");
_client_thread.async( [&](){ _session.reset(); } ).wait();
wlog("done wait");
});
_client.set_fail_handler( [=]( connection_hdl hdl ){
wlog("start wait");
_client_thread.async( [&](){ _session.reset(); } ).wait();
wlog("done wait");
});
_client.init_asio( &fc::asio::default_io_service() );
}
fc::promise<void>::ptr _connected;
fc::thread& _client_thread;
websocket_client_type _client;
websocket_session_ptr _session;
websocket_connection_ptr _connection;
};
} // namespace detail
websocket_server::websocket_server():my( new detail::websocket_server_impl() ) {}
websocket_server::~websocket_server(){}
void websocket_server::on_connection( const session_factory& factory )
{
my->_factory = factory;
}
void websocket_server::listen( uint16_t port )
{
my->_server.listen(port);
}
void websocket_server::start_accept() {
my->_server.start_accept();
}
websocket_client::websocket_client():my( new detail::websocket_client_impl() ) {}
websocket_client::~websocket_client(){}
websocket_session_ptr websocket_client::connect( const std::string& uri, const session_factory& factory )
{ try {
wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec;
my->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") );
my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
auto con = my->_client.get_con_from_hdl(hdl);
my->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_client_connection_type>>( con );
my->_session = factory( my->_connection );
my->_connected->set_value();
});
auto con = my->_client.get_connection( uri, ec );
if( ec )
{
FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
}
my->_client.connect(con);
my->_connected->wait();
return my->_session;
} FC_CAPTURE_AND_RETHROW( (uri) ) }
} } // fc::http

50
tests/websocket.cpp Normal file
View file

@ -0,0 +1,50 @@
#include <fc/network/http/websocket.hpp>
#include <fc/log/logger.hpp>
#include <fc/thread/thread.hpp>
using namespace fc::http;
class echo_session : public fc::http::websocket_session
{
public:
echo_session( const websocket_connection_ptr c ):fc::http::websocket_session(c){}
void on_message( const std::string& message )
{
idump((message));
if( message.size() < 64 )
send_message( "echo " + message );
}
};
int main( int argc, char** argv )
{
try {
auto create_session = [&]( const websocket_connection_ptr& c ){
return std::make_shared<echo_session>(c);
};
fc::http::websocket_server server;
server.on_connection(create_session);
server.listen( 8090 );
server.start_accept();
fc::http::websocket_client client;
auto session = client.connect( "ws://localhost:8090", create_session );
wlog( "connected" );
session->send_message( "hello world" );
fc::usleep( fc::seconds(2) );
return 0;
}
/*
catch ( const websocketpp::lib::error_code& e )
{
edump( (e.message()) );
}
*/
catch ( const fc::exception& e )
{
edump((e.to_detail_string()));
}
}

1
vendor/websocketpp vendored Submodule

@ -0,0 +1 @@
Subproject commit 13f6da6f81207ae7e67f0e7d25ed0e3cc2ec2f9c