Merge pull request #134 from bitshares/jmj_844b

Log host information from websocket requests
This commit is contained in:
Abit 2020-05-02 14:03:15 +02:00 committed by GitHub
commit c00824a137
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 314 additions and 117 deletions

View file

@ -15,6 +15,8 @@ namespace fc { namespace http {
class websocket_tls_client_impl; class websocket_tls_client_impl;
} // namespace detail } // namespace detail
// TODO refactor code, move stuff used by server or client only to derived class(es),
// E.G. it seems get_request_header and on_http* are for server only.
class websocket_connection class websocket_connection
{ {
public: public:
@ -32,7 +34,11 @@ namespace fc { namespace http {
virtual std::string get_request_header(const std::string& key) = 0; virtual std::string get_request_header(const std::string& key) = 0;
const std::string& get_remote_endpoint_string()const { return _remote_endpoint; }
fc::signal<void()> closed; fc::signal<void()> closed;
protected:
std::string _remote_endpoint; // for logging
private: private:
boost::any _session_data; boost::any _session_data;
std::function<void(const std::string&)> _on_message; std::function<void(const std::string&)> _on_message;
@ -45,7 +51,7 @@ namespace fc { namespace http {
class websocket_server class websocket_server
{ {
public: public:
websocket_server(); websocket_server(const std::string& forward_header_key);
~websocket_server(); ~websocket_server();
void on_connection( const on_connection_handler& handler); void on_connection( const on_connection_handler& handler);
@ -66,15 +72,15 @@ namespace fc { namespace http {
class websocket_tls_server class websocket_tls_server
{ {
public: public:
websocket_tls_server( const std::string& server_pem = std::string(), websocket_tls_server( const std::string& server_pem,
const std::string& ssl_password = std::string()); const std::string& ssl_password,
const std::string& forward_header_key );
~websocket_tls_server(); ~websocket_tls_server();
void on_connection( const on_connection_handler& handler); void on_connection( const on_connection_handler& handler);
void listen( uint16_t port ); void listen( uint16_t port );
void listen( const fc::ip::endpoint& ep ); void listen( const fc::ip::endpoint& ep );
void start_accept(); void start_accept();
private: private:
friend class detail::websocket_tls_server_impl; friend class detail::websocket_tls_server_impl;
std::unique_ptr<detail::websocket_tls_server_impl> my; std::unique_ptr<detail::websocket_tls_server_impl> my;
@ -91,19 +97,11 @@ namespace fc { namespace http {
void close(); void close();
void synchronous_close(); void synchronous_close();
void append_header(const std::string& key, const std::string& value);
private: private:
std::unique_ptr<detail::websocket_client_impl> my; std::unique_ptr<detail::websocket_client_impl> my;
std::unique_ptr<detail::websocket_tls_client_impl> smy; std::unique_ptr<detail::websocket_tls_client_impl> smy;
}; std::vector<std::pair<std::string,std::string>> headers;
class websocket_tls_client
{
public:
websocket_tls_client( const std::string& ca_filename = "_default" );
~websocket_tls_client();
websocket_connection_ptr connect( const std::string& uri );
private:
std::unique_ptr<detail::websocket_tls_client_impl> my;
}; };
} } } }

View file

@ -99,6 +99,13 @@ namespace fc {
void logger::add_appender( const appender::ptr& a ) void logger::add_appender( const appender::ptr& a )
{ my->_appenders.push_back(a); } { my->_appenders.push_back(a); }
void logger::remove_appender( const appender::ptr& a )
{
auto item = std::find(my->_appenders.begin(), my->_appenders.end(), a);
if (item != my->_appenders.end())
my->_appenders.erase(item);
}
std::vector<appender::ptr> logger::get_appenders()const std::vector<appender::ptr> logger::get_appenders()const
{ {
return my->_appenders; return my->_appenders;

View file

@ -73,14 +73,6 @@ namespace fc { namespace http {
typedef base::con_msg_manager_type con_msg_manager_type; typedef base::con_msg_manager_type con_msg_manager_type;
typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; typedef base::endpoint_msg_manager_type endpoint_msg_manager_type;
/// Custom Logging policies
/*typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::elevel> elog_type;
typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::alevel> alog_type;
*/
//typedef base::alog_type alog_type;
//typedef base::elog_type elog_type;
typedef websocketpp::log::stub elog_type; typedef websocketpp::log::stub elog_type;
typedef websocketpp::log::stub alog_type; typedef websocketpp::log::stub alog_type;
@ -124,14 +116,6 @@ namespace fc { namespace http {
typedef base::con_msg_manager_type con_msg_manager_type; typedef base::con_msg_manager_type con_msg_manager_type;
typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; typedef base::endpoint_msg_manager_type endpoint_msg_manager_type;
/// Custom Logging policies
/*typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::elevel> elog_type;
typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::alevel> alog_type;
*/
//typedef base::alog_type alog_type;
//typedef base::elog_type elog_type;
typedef websocketpp::log::stub elog_type; typedef websocketpp::log::stub elog_type;
typedef websocketpp::log::stub alog_type; typedef websocketpp::log::stub alog_type;
@ -164,8 +148,6 @@ namespace fc { namespace http {
typedef base::con_msg_manager_type con_msg_manager_type; typedef base::con_msg_manager_type con_msg_manager_type;
typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; typedef base::endpoint_msg_manager_type endpoint_msg_manager_type;
//typedef base::alog_type alog_type;
//typedef base::elog_type elog_type;
typedef websocketpp::log::stub elog_type; typedef websocketpp::log::stub elog_type;
typedef websocketpp::log::stub alog_type; typedef websocketpp::log::stub alog_type;
@ -184,10 +166,6 @@ namespace fc { namespace http {
transport_type; transport_type;
}; };
using websocketpp::connection_hdl; using websocketpp::connection_hdl;
typedef websocketpp::server<asio_with_stub_log> websocket_server_type; typedef websocketpp::server<asio_with_stub_log> websocket_server_type;
typedef websocketpp::server<asio_tls_stub_log> websocket_tls_server_type; typedef websocketpp::server<asio_tls_stub_log> websocket_tls_server_type;
@ -196,8 +174,9 @@ namespace fc { namespace http {
class websocket_connection_impl : public websocket_connection class websocket_connection_impl : public websocket_connection
{ {
public: public:
websocket_connection_impl( T con ) websocket_connection_impl( T con ) : _ws_connection(con)
:_ws_connection(con){ {
_remote_endpoint = con->get_remote_endpoint();
} }
virtual ~websocket_connection_impl() virtual ~websocket_connection_impl()
@ -206,8 +185,8 @@ namespace fc { namespace http {
virtual void send_message( const std::string& message )override virtual void send_message( const std::string& message )override
{ {
idump((message)); ilog( "[OUT] ${remote_endpoint} ${msg}",
//std::cerr<<"send: "<<message<<"\n"; ("remote_endpoint",_remote_endpoint) ("msg",message) );
auto ec = _ws_connection->send( message ); auto ec = _ws_connection->send( message );
FC_ASSERT( !ec, "websocket send failed: ${msg}", ("msg",ec.message() ) ); FC_ASSERT( !ec, "websocket send failed: ${msg}", ("msg",ec.message() ) );
} }
@ -224,21 +203,44 @@ namespace fc { namespace http {
T _ws_connection; T _ws_connection;
}; };
template<typename T>
class possibly_proxied_websocket_connection : public websocket_connection_impl<T>
{
public:
possibly_proxied_websocket_connection( T con, const std::string& forward_header_key )
: websocket_connection_impl<T>(con)
{
// By calling the parent's constructor, _remote_endpoint has been initialized.
// Now try to extract remote address from the header, if found, overwrite it
if( !forward_header_key.empty() )
{
const std::string value = this->get_request_header( forward_header_key );
if( !value.empty() )
this->_remote_endpoint = value;
}
}
virtual ~possibly_proxied_websocket_connection()
{
}
};
typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr; typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr;
class websocket_server_impl class websocket_server_impl
{ {
public: public:
websocket_server_impl() websocket_server_impl( const std::string& forward_header_key )
:_server_thread( fc::thread::current() ) :_server_thread( fc::thread::current() ), _forward_header_key(forward_header_key)
{ {
_server.clear_access_channels( websocketpp::log::alevel::all ); _server.clear_access_channels( websocketpp::log::alevel::all );
_server.init_asio(&fc::asio::default_io_service()); _server.init_asio(&fc::asio::default_io_service());
_server.set_reuse_addr(true); _server.set_reuse_addr(true);
_server.set_open_handler( [&]( connection_hdl hdl ){ _server.set_open_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [this, hdl](){
auto new_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) ); auto new_con = std::make_shared<possibly_proxied_websocket_connection<
websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl),
_forward_header_key );
_on_connection( _connections[hdl] = new_con ); _on_connection( _connections[hdl] = new_con );
}).wait(); }).wait();
}); });
@ -246,11 +248,16 @@ namespace fc { namespace http {
_server_thread.async( [&](){ _server_thread.async( [&](){
auto current_con = _connections.find(hdl); auto current_con = _connections.find(hdl);
assert( current_con != _connections.end() ); assert( current_con != _connections.end() );
wdump(("server")(msg->get_payload()));
auto payload = msg->get_payload(); auto payload = msg->get_payload();
std::shared_ptr<websocket_connection> con = current_con->second; std::shared_ptr<websocket_connection> con = current_con->second;
wlog( "[IN] ${remote_endpoint} ${msg}",
("remote_endpoint",con->get_remote_endpoint_string()) ("msg",payload) );
++_pending_messages; ++_pending_messages;
auto f = fc::async([this,con,payload](){ if( _pending_messages ) --_pending_messages; con->on_message( payload ); }); auto f = fc::async([this,con,payload](){
if( _pending_messages )
--_pending_messages;
con->on_message( payload );
});
if( _pending_messages > 100 ) if( _pending_messages > 100 )
f.wait(); f.wait();
}).wait(); }).wait();
@ -263,17 +270,23 @@ namespace fc { namespace http {
_server.set_http_handler( [&]( connection_hdl hdl ){ _server.set_http_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto current_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) ); auto con = _server.get_con_from_hdl(hdl);
auto current_con = std::make_shared<possibly_proxied_websocket_connection<
websocket_server_type::connection_ptr>>( con, _forward_header_key );
_on_connection( current_con ); _on_connection( current_con );
auto con = _server.get_con_from_hdl(hdl);
con->defer_http_response(); con->defer_http_response();
std::string remote_endpoint = current_con->get_remote_endpoint_string();
std::string request_body = con->get_request_body(); std::string request_body = con->get_request_body();
wdump(("server")(request_body)); wlog( "[HTTP-IN] ${remote_endpoint} ${msg}",
("remote_endpoint",remote_endpoint) ("msg",request_body) );
fc::async([current_con, request_body, con] { fc::async([current_con, request_body, con, remote_endpoint] {
fc::http::reply response = current_con->on_http(request_body); fc::http::reply response = current_con->on_http(request_body);
idump( (response) ); ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}",
("remote_endpoint",remote_endpoint)
("status",response.status)
("msg",response.body_as_string) );
con->set_body( std::move( response.body_as_string ) ); con->set_body( std::move( response.body_as_string ) );
con->set_status( websocketpp::http::status_code::value(response.status) ); con->set_status( websocketpp::http::status_code::value(response.status) );
con->send_http_response(); con->send_http_response();
@ -340,15 +353,16 @@ namespace fc { namespace http {
on_connection_handler _on_connection; on_connection_handler _on_connection;
fc::promise<void>::ptr _closed; fc::promise<void>::ptr _closed;
uint32_t _pending_messages = 0; uint32_t _pending_messages = 0;
std::string _forward_header_key; // A header like "X-Forwarded-For" (XFF), with data IP:port
}; };
class websocket_tls_server_impl class websocket_tls_server_impl
{ {
public: public:
websocket_tls_server_impl( const string& server_pem, const string& ssl_password ) websocket_tls_server_impl( const string& server_pem, const string& ssl_password,
:_server_thread( fc::thread::current() ) const std::string& forward_header_key )
:_server_thread( fc::thread::current() ), _forward_header_key(forward_header_key)
{ {
//if( server_pem.size() )
{ {
_server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr { _server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr {
context_ptr ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::tlsv1); context_ptr ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::tlsv1);
@ -372,7 +386,9 @@ namespace fc { namespace http {
_server.set_reuse_addr(true); _server.set_reuse_addr(true);
_server.set_open_handler( [&]( connection_hdl hdl ){ _server.set_open_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto new_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) ); auto new_con = std::make_shared<possibly_proxied_websocket_connection<
websocket_tls_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl),
_forward_header_key );
_on_connection( _connections[hdl] = new_con ); _on_connection( _connections[hdl] = new_con );
}).wait(); }).wait();
}); });
@ -382,6 +398,8 @@ namespace fc { namespace http {
assert( current_con != _connections.end() ); assert( current_con != _connections.end() );
auto received = msg->get_payload(); auto received = msg->get_payload();
std::shared_ptr<websocket_connection> con = current_con->second; std::shared_ptr<websocket_connection> con = current_con->second;
wlog( "[IN] ${remote_endpoint} ${msg}",
("remote_endpoint",con->get_remote_endpoint_string()) ("msg",received) );
fc::async([con,received](){ con->on_message( received ); }); fc::async([con,received](){ con->on_message( received ); });
}).wait(); }).wait();
}); });
@ -389,14 +407,21 @@ namespace fc { namespace http {
_server.set_http_handler( [&]( connection_hdl hdl ){ _server.set_http_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto current_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) ); auto con = _server.get_con_from_hdl(hdl);
auto current_con = std::make_shared<possibly_proxied_websocket_connection<
websocket_tls_server_type::connection_ptr>>( con, _forward_header_key );
try{ try{
_on_connection( current_con ); _on_connection( current_con );
auto con = _server.get_con_from_hdl(hdl); std::string remote_endpoint = current_con->get_remote_endpoint_string();
wdump(("server")(con->get_request_body())); std::string request_body = con->get_request_body();
auto response = current_con->on_http( con->get_request_body() ); wlog( "[HTTP-IN] ${remote_endpoint} ${msg}",
idump((response)); ("remote_endpoint",remote_endpoint) ("msg",request_body) );
auto response = current_con->on_http( request_body );
ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}",
("remote_endpoint",remote_endpoint)
("status",response.status)
("msg",response.body_as_string) );
con->set_body( std::move( response.body_as_string ) ); con->set_body( std::move( response.body_as_string ) );
con->set_status( websocketpp::http::status_code::value( response.status ) ); con->set_status( websocketpp::http::status_code::value( response.status ) );
} catch ( const fc::exception& e ) } catch ( const fc::exception& e )
@ -428,6 +453,7 @@ namespace fc { namespace http {
} }
}); });
} }
~websocket_tls_server_impl() ~websocket_tls_server_impl()
{ {
if( _server.is_listening() ) if( _server.is_listening() )
@ -444,26 +470,14 @@ namespace fc { namespace http {
websocket_tls_server_type _server; websocket_tls_server_type _server;
on_connection_handler _on_connection; on_connection_handler _on_connection;
fc::promise<void>::ptr _closed; fc::promise<void>::ptr _closed;
std::string _forward_header_key; // A header like "X-Forwarded-For" (XFF), with data IP:port
}; };
typedef websocketpp::client<asio_with_stub_log> websocket_client_type; typedef websocketpp::client<asio_with_stub_log> websocket_client_type;
typedef websocketpp::client<asio_tls_stub_log> websocket_tls_client_type; typedef websocketpp::client<asio_tls_stub_log> websocket_tls_client_type;
typedef websocket_client_type::connection_ptr websocket_client_connection_type; typedef websocket_client_type::connection_ptr websocket_client_connection_type;
typedef websocket_tls_client_type::connection_ptr websocket_tls_client_connection_type; typedef websocket_tls_client_type::connection_ptr websocket_tls_client_connection_type;
using websocketpp::connection_hdl;
template<typename T> template<typename T>
class generic_websocket_client_impl class generic_websocket_client_impl
@ -477,7 +491,6 @@ namespace fc { namespace http {
typename websocketpp::client<T>::message_ptr msg ){ typename websocketpp::client<T>::message_ptr msg ){
_client_thread.async( [&](){ _client_thread.async( [&](){
wdump((msg->get_payload())); wdump((msg->get_payload()));
//std::cerr<<"recv: "<<msg->get_payload()<<"\n";
auto received = msg->get_payload(); auto received = msg->get_payload();
fc::async( [=](){ fc::async( [=](){
if( _connection ) if( _connection )
@ -598,7 +611,8 @@ namespace fc { namespace http {
} // namespace detail } // namespace detail
websocket_server::websocket_server():my( new detail::websocket_server_impl() ) {} websocket_server::websocket_server( const std::string& forward_header_key )
:my( new detail::websocket_server_impl( forward_header_key ) ) {}
websocket_server::~websocket_server(){} websocket_server::~websocket_server(){}
void websocket_server::on_connection( const on_connection_handler& handler ) void websocket_server::on_connection( const on_connection_handler& handler )
@ -636,9 +650,11 @@ namespace fc { namespace http {
my->_server.close(connection.first, websocketpp::close::status::normal, "Goodbye"); my->_server.close(connection.first, websocketpp::close::status::normal, "Goodbye");
} }
websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password,
const std::string& forward_header_key )
:my( new detail::websocket_tls_server_impl(server_pem, ssl_password, forward_header_key) )
{}
websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password ):my( new detail::websocket_tls_server_impl(server_pem, ssl_password) ) {}
websocket_tls_server::~websocket_tls_server(){} websocket_tls_server::~websocket_tls_server(){}
void websocket_tls_server::on_connection( const on_connection_handler& handler ) void websocket_tls_server::on_connection( const on_connection_handler& handler )
@ -660,12 +676,11 @@ namespace fc { namespace http {
} }
websocket_tls_client::websocket_tls_client( const std::string& ca_filename ):my( new detail::websocket_tls_client_impl( ca_filename ) ) {} websocket_client::websocket_client( const std::string& ca_filename )
websocket_tls_client::~websocket_tls_client(){ } :my( new detail::websocket_client_impl() ),
smy(new detail::websocket_tls_client_impl( ca_filename ))
{}
websocket_client::websocket_client( const std::string& ca_filename ):my( new detail::websocket_client_impl() ),smy(new detail::websocket_tls_client_impl( ca_filename )) {}
websocket_client::~websocket_client(){ } websocket_client::~websocket_client(){ }
websocket_connection_ptr websocket_client::connect( const std::string& uri ) websocket_connection_ptr websocket_client::connect( const std::string& uri )
@ -674,7 +689,6 @@ namespace fc { namespace http {
return secure_connect(uri); return secure_connect(uri);
FC_ASSERT( uri.substr(0,3) == "ws:" ); FC_ASSERT( uri.substr(0,3) == "ws:" );
// wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
my->_uri = uri; my->_uri = uri;
@ -683,13 +697,17 @@ namespace fc { namespace http {
my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
my->_hdl = hdl; my->_hdl = hdl;
auto con = my->_client.get_con_from_hdl(hdl); auto con = my->_client.get_con_from_hdl(hdl);
my->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_client_connection_type>>( con ); my->_connection = std::make_shared<detail::websocket_connection_impl<
detail::websocket_client_connection_type>>( con );
my->_closed = promise<void>::create("websocket::closed"); my->_closed = promise<void>::create("websocket::closed");
my->_connected->set_value(); my->_connected->set_value();
}); });
auto con = my->_client.get_connection( uri, ec ); auto con = my->_client.get_connection( uri, ec );
std::for_each(headers.begin(), headers.end(), [con](std::pair<std::string, std::string> in) {
con->append_header(in.first, in.second);
});
if( ec ) FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); if( ec ) FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
my->_client.connect(con); my->_client.connect(con);
@ -697,25 +715,30 @@ namespace fc { namespace http {
return my->_connection; return my->_connection;
} FC_CAPTURE_AND_RETHROW( (uri) ) } } FC_CAPTURE_AND_RETHROW( (uri) ) }
// TODO most code in this function is same as ::connect, best refactor
websocket_connection_ptr websocket_client::secure_connect( const std::string& uri ) websocket_connection_ptr websocket_client::secure_connect( const std::string& uri )
{ try { { try {
if( uri.substr(0,3) == "ws:" ) if( uri.substr(0,3) == "ws:" )
return connect(uri); return connect(uri);
FC_ASSERT( uri.substr(0,4) == "wss:" ); FC_ASSERT( uri.substr(0,4) == "wss:" );
// wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
smy->_uri = uri; smy->_uri = uri;
smy->_connected = promise<void>::create("websocket::connect"); smy->_connected = promise<void>::create("websocket::connect");
smy->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ smy->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
smy->_hdl = hdl;
auto con = smy->_client.get_con_from_hdl(hdl); auto con = smy->_client.get_con_from_hdl(hdl);
smy->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_tls_client_connection_type>>( con ); smy->_connection = std::make_shared<detail::websocket_connection_impl<
detail::websocket_tls_client_connection_type>>( con );
smy->_closed = promise<void>::create("websocket::closed"); smy->_closed = promise<void>::create("websocket::closed");
smy->_connected->set_value(); smy->_connected->set_value();
}); });
auto con = smy->_client.get_connection( uri, ec ); auto con = smy->_client.get_connection( uri, ec );
std::for_each(headers.begin(), headers.end(), [con](std::pair<std::string, std::string> in) {
con->append_header(in.first, in.second);
});
if( ec ) if( ec )
FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
smy->_client.connect(con); smy->_client.connect(con);
@ -736,28 +759,9 @@ namespace fc { namespace http {
my->_closed->wait(); my->_closed->wait();
} }
websocket_connection_ptr websocket_tls_client::connect( const std::string& uri ) void websocket_client::append_header(const std::string& key, const std::string& value)
{ try {
// wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec;
my->_connected = promise<void>::create("websocket::connect");
my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
auto con = my->_client.get_con_from_hdl(hdl);
my->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_tls_client_connection_type>>( con );
my->_closed = promise<void>::create("websocket::closed");
my->_connected->set_value();
});
auto con = my->_client.get_connection( uri, ec );
if( ec )
{ {
FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); headers.push_back( std::make_pair(key, value) );
} }
my->_client.connect(con);
my->_connected->wait();
return my->_connection;
} FC_CAPTURE_AND_RETHROW( (uri) ) }
} } // fc::http } } // fc::http

View file

@ -16,6 +16,12 @@ target_link_libraries( hmac_test fc )
add_executable( ecc_test crypto/ecc_test.cpp ) add_executable( ecc_test crypto/ecc_test.cpp )
target_link_libraries( ecc_test fc ) target_link_libraries( ecc_test fc )
add_executable( ws_test_server ws_test_server.cpp )
target_link_libraries( ws_test_server fc )
add_executable( ws_test_client ws_test_client.cpp )
target_link_libraries( ws_test_client fc )
#add_executable( test_aes aes_test.cpp ) #add_executable( test_aes aes_test.cpp )
#target_link_libraries( test_aes fc ${rt_library} ${pthread_library} ) #target_link_libraries( test_aes fc ${rt_library} ${pthread_library} )
#add_executable( test_sleep sleep.cpp ) #add_executable( test_sleep sleep.cpp )

View file

@ -69,7 +69,7 @@ BOOST_AUTO_TEST_CASE(login_test) {
try { try {
fc::api<fc::test::calculator> calc_api( std::make_shared<fc::test::some_calculator>() ); fc::api<fc::test::calculator> calc_api( std::make_shared<fc::test::some_calculator>() );
auto server = std::make_shared<fc::http::websocket_server>(); auto server = std::make_shared<fc::http::websocket_server>("");
server->on_connection([&]( const websocket_connection_ptr& c ){ server->on_connection([&]( const websocket_connection_ptr& c ){
auto wsc = std::make_shared<websocket_api_connection>(c, MAX_DEPTH); auto wsc = std::make_shared<websocket_api_connection>(c, MAX_DEPTH);
auto login = std::make_shared<fc::test::login_api>(); auto login = std::make_shared<fc::test::login_api>();
@ -116,7 +116,7 @@ BOOST_AUTO_TEST_CASE(optionals_test) {
BOOST_CHECK_EQUAL(oapi->bar("a", "b", "c"), "[\"a\",\"b\",\"c\"]"); BOOST_CHECK_EQUAL(oapi->bar("a", "b", "c"), "[\"a\",\"b\",\"c\"]");
BOOST_CHECK_EQUAL(oapi->bar("a", {}, "c"), "[\"a\",null,\"c\"]"); BOOST_CHECK_EQUAL(oapi->bar("a", {}, "c"), "[\"a\",null,\"c\"]");
auto server = std::make_shared<fc::http::websocket_server>(); auto server = std::make_shared<fc::http::websocket_server>("");
server->on_connection([&]( const websocket_connection_ptr& c ){ server->on_connection([&]( const websocket_connection_ptr& c ){
auto wsc = std::make_shared<websocket_api_connection>(c, MAX_DEPTH); auto wsc = std::make_shared<websocket_api_connection>(c, MAX_DEPTH);
wsc->register_api(fc::api<fc::test::optionals_api>(optionals)); wsc->register_api(fc::api<fc::test::optionals_api>(optionals));

View file

@ -3,21 +3,35 @@
#include <fc/network/http/websocket.hpp> #include <fc/network/http/websocket.hpp>
#include <iostream> #include <iostream>
#include <fc/log/logger.hpp>
#include <fc/log/console_appender.hpp>
BOOST_AUTO_TEST_SUITE(fc_network) BOOST_AUTO_TEST_SUITE(fc_network)
BOOST_AUTO_TEST_CASE(websocket_test) BOOST_AUTO_TEST_CASE(websocket_test)
{ {
// set up logging
fc::appender::ptr ca(new fc::console_appender);
fc::logger l = fc::logger::get("rpc");
l.add_appender( ca );
fc::http::websocket_client client; fc::http::websocket_client client;
fc::http::websocket_connection_ptr s_conn, c_conn; fc::http::websocket_connection_ptr s_conn, c_conn;
int port; int port;
{ {
fc::http::websocket_server server; // even with this key set, if the remote does not provide it, it will revert to
// the remote endpoint
fc::http::websocket_server server("MyProxyHeaderKey");
server.on_connection([&]( const fc::http::websocket_connection_ptr& c ){ server.on_connection([&]( const fc::http::websocket_connection_ptr& c ){
s_conn = c; s_conn = c;
c->on_message_handler([&](const std::string& s){ c->on_message_handler([&](const std::string& s){
c->send_message("echo: " + s); c->send_message("echo: " + s);
}); });
c->on_http_handler([&](const std::string& s){
fc::http::reply reply;
reply.body_as_string = "http-echo: " + s;
return reply;
});
}); });
server.listen( 0 ); server.listen( 0 );
@ -48,10 +62,104 @@ BOOST_AUTO_TEST_CASE(websocket_test)
c_conn->send_message( "hello world" ); c_conn->send_message( "hello world" );
fc::usleep( fc::milliseconds(100) ); fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_EQUAL("echo: hello world", echo); BOOST_CHECK_EQUAL("echo: hello world", echo);
// Testing on_http
std::string server_endpoint_string = "127.0.0.1:" + fc::to_string(port);
fc::ip::endpoint server_endpoint = fc::ip::endpoint::from_string( server_endpoint_string );
fc::http::connection c_http_conn;
c_http_conn.connect_to( server_endpoint );
std::string url = "http://localhost:" + fc::to_string(port);
fc::http::reply reply = c_http_conn.request( "POST", url, "hello world again" );
std::string reply_body( reply.body.begin(), reply.body.end() );
BOOST_CHECK_EQUAL(200, reply.status);
BOOST_CHECK_EQUAL("http-echo: hello world again", reply_body );
} }
BOOST_CHECK_THROW(c_conn->send_message( "again" ), fc::assert_exception); BOOST_CHECK_THROW(c_conn->send_message( "again" ), fc::assert_exception);
BOOST_CHECK_THROW(client.connect( "ws://localhost:" + fc::to_string(port) ), fc::exception); BOOST_CHECK_THROW(client.connect( "ws://localhost:" + fc::to_string(port) ), fc::exception);
l.remove_appender(ca);
}
BOOST_AUTO_TEST_CASE(websocket_test_with_proxy_header)
{
// set up logging
fc::appender::ptr ca(new fc::console_appender);
fc::logger l = fc::logger::get("rpc");
auto old_log_level = l.get_log_level();
l.set_log_level( fc::log_level::info );
l.add_appender( ca );
fc::http::websocket_client client;
// add the proxy header element
client.append_header("MyProxyHeaderKey", "MyServer:8080");
fc::http::websocket_connection_ptr s_conn, c_conn;
int port;
{
// the server will be on the lookout for the key in the header
fc::http::websocket_server server("MyProxyHeaderKey");
server.on_connection([&]( const fc::http::websocket_connection_ptr& c ){
s_conn = c;
c->on_message_handler([&](const std::string& s){
c->send_message("echo: " + s);
});
c->on_http_handler([&](const std::string& s){
fc::http::reply reply;
reply.body_as_string = "http-echo: " + s;
return reply;
});
});
server.listen( 0 );
port = server.get_listening_port();
server.start_accept();
std::string echo;
c_conn = client.connect( "ws://localhost:" + fc::to_string(port) );
c_conn->on_message_handler([&](const std::string& s){
echo = s;
});
c_conn->send_message( "hello world" );
fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_EQUAL("echo: hello world", echo);
c_conn->send_message( "again" );
fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_EQUAL("echo: again", echo);
s_conn->close(0, "test");
fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_THROW(c_conn->send_message( "again" ), fc::exception);
c_conn = client.connect( "ws://localhost:" + fc::to_string(port) );
c_conn->on_message_handler([&](const std::string& s){
echo = s;
});
c_conn->send_message( "hello world" );
fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_EQUAL("echo: hello world", echo);
// Testing on_http
std::string server_endpoint_string = "127.0.0.1:" + fc::to_string(port);
fc::ip::endpoint server_endpoint = fc::ip::endpoint::from_string( server_endpoint_string );
fc::http::connection c_http_conn;
c_http_conn.connect_to( server_endpoint );
std::string url = "http://localhost:" + fc::to_string(port);
fc::http::header fwd("MyProxyHeaderKey", "MyServer:8080");
fc::http::headers headers {fwd};
fc::http::reply reply = c_http_conn.request( "POST", url, "hello world again", headers );
std::string reply_body( reply.body.begin(), reply.body.end() );
BOOST_CHECK_EQUAL(200, reply.status);
BOOST_CHECK_EQUAL("http-echo: hello world again", reply_body );
}
BOOST_CHECK_THROW(c_conn->send_message( "again" ), fc::assert_exception);
BOOST_CHECK_THROW(client.connect( "ws://localhost:" + fc::to_string(port) ), fc::exception);
l.remove_appender(ca);
l.set_log_level(old_log_level);
} }
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

41
tests/ws_test_client.cpp Normal file
View file

@ -0,0 +1,41 @@
#include <fc/network/http/websocket.hpp>
#include <websocketpp/error.hpp>
#include <iostream>
#include <string>
#include <fc/log/logger.hpp>
#include <fc/log/console_appender.hpp>
int main(int argc, char** argv)
{
try
{
// set up logging
fc::appender::ptr ca(new fc::console_appender);
fc::logger l = fc::logger::get("rpc");
l.add_appender( ca );
fc::http::websocket_client client;
fc::http::websocket_connection_ptr s_conn, c_conn;
std::string url = argv[1];
wlog( "Connecting to server at url ${url}", ("url", url) );
c_conn = client.connect( "ws://" + url );
std::string echo;
c_conn->on_message_handler([&](const std::string& s){
echo = s;
});
c_conn->send_message( "hello world" );
fc::usleep( fc::milliseconds(500) );
if (echo != std::string("echo: hello world") )
wlog( "Test1 failed, echo value: [${echo}]", ("echo", echo) );
c_conn->send_message( "again" );
fc::usleep( fc::milliseconds(500) );
if ("echo: again" != echo)
wlog( "Test2 failed, echo value: [${echo}]", ("echo", echo) );
}
catch (const websocketpp::exception& ex)
{
elog( "websocketpp::exception thrown: ${err}", ("err", ex.what()) );
}
}

33
tests/ws_test_server.cpp Normal file
View file

@ -0,0 +1,33 @@
#include <fc/network/http/websocket.hpp>
#include <fc/asio.hpp>
#include <iostream>
#include <chrono>
#include <fc/log/logger.hpp>
#include <fc/log/console_appender.hpp>
int main(int argc, char** argv)
{
// set up logging
fc::appender::ptr ca(new fc::console_appender);
fc::logger l = fc::logger::get("rpc");
l.add_appender( ca );
fc::http::websocket_server server("MyForwardHeaderKey");
server.on_connection([&]( const fc::http::websocket_connection_ptr& c ){
c->on_message_handler([&](const std::string& s){
c->send_message("echo: " + s);
});
});
server.listen( 0 );
server.start_accept();
wlog( "Port: ${port}", ("port", server.get_listening_port()) );
while( true )
{
fc::usleep( fc::microseconds(100) );
}
}