Updates from BitShares FC #22

Closed
nathanielhourt wants to merge 693 commits from dapp-support into latest-fc
3 changed files with 63 additions and 70 deletions
Showing only changes of commit 82935acd86 - Show all commits

View file

@ -30,6 +30,7 @@ namespace fc { namespace http {
boost::any& get_session_data() { return _session_data; } boost::any& get_session_data() { return _session_data; }
virtual std::string get_request_header(const std::string& key) = 0; virtual std::string get_request_header(const std::string& key) = 0;
virtual std::string get_host() = 0;
fc::signal<void()> closed; fc::signal<void()> closed;
private: private:
@ -44,7 +45,7 @@ namespace fc { namespace http {
class websocket_server class websocket_server
{ {
public: public:
websocket_server(); websocket_server( std::string host_header_key = std::string() );
~websocket_server(); ~websocket_server();
void on_connection( const on_connection_handler& handler); void on_connection( const on_connection_handler& handler);
@ -66,7 +67,7 @@ namespace fc { namespace http {
{ {
public: public:
websocket_tls_server( const std::string& server_pem = std::string(), websocket_tls_server( const std::string& server_pem = std::string(),
const std::string& ssl_password = std::string()); const std::string& ssl_password = std::string(), std::string host_header_key = std::string() );
~websocket_tls_server(); ~websocket_tls_server();
void on_connection( const on_connection_handler& handler); void on_connection( const on_connection_handler& handler);

View file

@ -40,14 +40,6 @@ namespace fc { namespace http {
typedef base::con_msg_manager_type con_msg_manager_type; typedef base::con_msg_manager_type con_msg_manager_type;
typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; typedef base::endpoint_msg_manager_type endpoint_msg_manager_type;
/// Custom Logging policies
/*typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::elevel> elog_type;
typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::alevel> alog_type;
*/
//typedef base::alog_type alog_type;
//typedef base::elog_type elog_type;
typedef websocketpp::log::stub elog_type; typedef websocketpp::log::stub elog_type;
typedef websocketpp::log::stub alog_type; typedef websocketpp::log::stub alog_type;
@ -91,14 +83,6 @@ namespace fc { namespace http {
typedef base::con_msg_manager_type con_msg_manager_type; typedef base::con_msg_manager_type con_msg_manager_type;
typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; typedef base::endpoint_msg_manager_type endpoint_msg_manager_type;
/// Custom Logging policies
/*typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::elevel> elog_type;
typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::alevel> alog_type;
*/
//typedef base::alog_type alog_type;
//typedef base::elog_type elog_type;
typedef websocketpp::log::stub elog_type; typedef websocketpp::log::stub elog_type;
typedef websocketpp::log::stub alog_type; typedef websocketpp::log::stub alog_type;
@ -131,8 +115,6 @@ namespace fc { namespace http {
typedef base::con_msg_manager_type con_msg_manager_type; typedef base::con_msg_manager_type con_msg_manager_type;
typedef base::endpoint_msg_manager_type endpoint_msg_manager_type; typedef base::endpoint_msg_manager_type endpoint_msg_manager_type;
//typedef base::alog_type alog_type;
//typedef base::elog_type elog_type;
typedef websocketpp::log::stub elog_type; typedef websocketpp::log::stub elog_type;
typedef websocketpp::log::stub alog_type; typedef websocketpp::log::stub alog_type;
@ -151,10 +133,6 @@ namespace fc { namespace http {
transport_type; transport_type;
}; };
using websocketpp::connection_hdl; using websocketpp::connection_hdl;
typedef websocketpp::server<asio_with_stub_log> websocket_server_type; typedef websocketpp::server<asio_with_stub_log> websocket_server_type;
typedef websocketpp::server<asio_tls_stub_log> websocket_tls_server_type; typedef websocketpp::server<asio_tls_stub_log> websocket_tls_server_type;
@ -163,21 +141,19 @@ namespace fc { namespace http {
class websocket_connection_impl : public websocket_connection class websocket_connection_impl : public websocket_connection
{ {
public: public:
websocket_connection_impl( T con ) websocket_connection_impl( T con, std::string host_header_key = std::string() )
:_ws_connection(con){ :_ws_connection(con), host_header_key(host_header_key) {
} }
~websocket_connection_impl() ~websocket_connection_impl() {}
{
}
virtual void send_message( const std::string& message )override virtual void send_message( const std::string& message )override
{ {
idump((message)); idump((message));
//std::cerr<<"send: "<<message<<"\n";
auto ec = _ws_connection->send( message ); auto ec = _ws_connection->send( message );
FC_ASSERT( !ec, "websocket send failed: ${msg}", ("msg",ec.message() ) ); FC_ASSERT( !ec, "websocket send failed: ${msg}", ("msg",ec.message() ) );
} }
virtual void close( int64_t code, const std::string& reason )override virtual void close( int64_t code, const std::string& reason )override
{ {
_ws_connection->close(code,reason); _ws_connection->close(code,reason);
@ -188,7 +164,19 @@ namespace fc { namespace http {
return _ws_connection->get_request_header(key); return _ws_connection->get_request_header(key);
} }
virtual std::string get_host() override
{
if ( !host_header_key.empty() )
{
auto header = get_request_header( host_header_key );
if ( !header.empty() )
return header;
}
return _ws_connection->get_host();
}
private:
T _ws_connection; T _ws_connection;
std::string host_header_key;
}; };
typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr; typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr;
@ -196,7 +184,7 @@ namespace fc { namespace http {
class websocket_server_impl class websocket_server_impl
{ {
public: public:
websocket_server_impl() websocket_server_impl( std::string host_header_key )
:_server_thread( fc::thread::current() ) :_server_thread( fc::thread::current() )
{ {
@ -205,7 +193,8 @@ namespace fc { namespace http {
_server.set_reuse_addr(true); _server.set_reuse_addr(true);
_server.set_open_handler( [&]( connection_hdl hdl ){ _server.set_open_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto new_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) ); auto new_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>
( _server.get_con_from_hdl(hdl), host_header_key );
_on_connection( _connections[hdl] = new_con ); _on_connection( _connections[hdl] = new_con );
}).wait(); }).wait();
}); });
@ -213,11 +202,15 @@ namespace fc { namespace http {
_server_thread.async( [&](){ _server_thread.async( [&](){
auto current_con = _connections.find(hdl); auto current_con = _connections.find(hdl);
assert( current_con != _connections.end() ); assert( current_con != _connections.end() );
wdump(("server")(msg->get_payload()));
auto payload = msg->get_payload(); auto payload = msg->get_payload();
std::shared_ptr<websocket_connection> con = current_con->second; std::shared_ptr<websocket_connection> ws_con = current_con->second;
wdump( ("server") (ws_con->get_host()) (payload) );
++_pending_messages; ++_pending_messages;
auto f = fc::async([this,con,payload](){ if( _pending_messages ) --_pending_messages; con->on_message( payload ); }); auto f = fc::async([this,ws_con,payload]()
{
if( _pending_messages ) --_pending_messages;
ws_con->on_message( payload );
});
if( _pending_messages > 100 ) if( _pending_messages > 100 )
f.wait(); f.wait();
}).wait(); }).wait();
@ -230,17 +223,18 @@ namespace fc { namespace http {
_server.set_http_handler( [&]( connection_hdl hdl ){ _server.set_http_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto current_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) ); auto current_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>(
_server.get_con_from_hdl(hdl), host_header_key );
_on_connection( current_con ); _on_connection( current_con );
auto con = _server.get_con_from_hdl(hdl); auto con = _server.get_con_from_hdl(hdl);
con->defer_http_response(); con->defer_http_response();
std::string request_body = con->get_request_body(); std::string request_body = con->get_request_body();
wdump(("server")(request_body)); wdump( ("server") (current_con->get_host()) (request_body) );
fc::async([current_con, request_body, con] { fc::async([current_con, request_body, con] {
std::string response = current_con->on_http(request_body); std::string response = current_con->on_http(request_body);
idump((response)); idump( ("server") (current_con->get_host()) (response) );
con->set_body( response ); con->set_body( response );
con->set_status( websocketpp::http::status_code::ok ); con->set_status( websocketpp::http::status_code::ok );
con->send_http_response(); con->send_http_response();
@ -312,10 +306,9 @@ namespace fc { namespace http {
class websocket_tls_server_impl class websocket_tls_server_impl
{ {
public: public:
websocket_tls_server_impl( const string& server_pem, const string& ssl_password ) websocket_tls_server_impl( const string& server_pem, const string& ssl_password,
:_server_thread( fc::thread::current() ) std::string host_header_key ) :_server_thread( fc::thread::current() )
{ {
//if( server_pem.size() )
{ {
_server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr { _server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr {
context_ptr ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::tlsv1); context_ptr ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::tlsv1);
@ -339,7 +332,8 @@ namespace fc { namespace http {
_server.set_reuse_addr(true); _server.set_reuse_addr(true);
_server.set_open_handler( [&]( connection_hdl hdl ){ _server.set_open_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto new_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) ); auto new_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>(
_server.get_con_from_hdl(hdl), host_header_key );
_on_connection( _connections[hdl] = new_con ); _on_connection( _connections[hdl] = new_con );
}).wait(); }).wait();
}); });
@ -348,27 +342,29 @@ namespace fc { namespace http {
auto current_con = _connections.find(hdl); auto current_con = _connections.find(hdl);
assert( current_con != _connections.end() ); assert( current_con != _connections.end() );
auto received = msg->get_payload(); auto received = msg->get_payload();
std::shared_ptr<websocket_connection> con = current_con->second; std::shared_ptr<websocket_connection> ws_con = current_con->second;
fc::async([con,received](){ con->on_message( received ); }); wdump( ("server") (ws_con->get_host()) (received) );
fc::async([ws_con,received](){ ws_con->on_message( received ); });
}).wait(); }).wait();
}); });
_server.set_http_handler( [&]( connection_hdl hdl ){ _server.set_http_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto current_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) ); auto current_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>(
_server.get_con_from_hdl(hdl), host_header_key );
try{ try{
_on_connection( current_con ); _on_connection( current_con );
auto con = _server.get_con_from_hdl(hdl); auto con = _server.get_con_from_hdl(hdl);
wdump(("server")(con->get_request_body())); wdump(("server") (con->get_host()) (con->get_request_body()) );
auto response = current_con->on_http( con->get_request_body() ); auto response = current_con->on_http( con->get_request_body() );
idump((response)); idump( ("server") (current_con->get_host()) (response) );
con->set_body( response ); con->set_body( response );
con->set_status( websocketpp::http::status_code::ok ); con->set_status( websocketpp::http::status_code::ok );
} catch ( const fc::exception& e ) } catch ( const fc::exception& e )
{ {
edump((e.to_detail_string())); edump( (current_con->get_host()) (e.to_detail_string()) );
} }
current_con->closed(); current_con->closed();
@ -413,18 +409,6 @@ namespace fc { namespace http {
fc::promise<void>::ptr _closed; fc::promise<void>::ptr _closed;
}; };
typedef websocketpp::client<asio_with_stub_log> websocket_client_type; typedef websocketpp::client<asio_with_stub_log> websocket_client_type;
typedef websocketpp::client<asio_tls_stub_log> websocket_tls_client_type; typedef websocketpp::client<asio_tls_stub_log> websocket_tls_client_type;
@ -443,8 +427,7 @@ namespace fc { namespace http {
_client.clear_access_channels( websocketpp::log::alevel::all ); _client.clear_access_channels( websocketpp::log::alevel::all );
_client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){ _client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){
_client_thread.async( [&](){ _client_thread.async( [&](){
wdump((msg->get_payload())); wdump( (_connection->get_host()) (msg->get_payload()) );
//std::cerr<<"recv: "<<msg->get_payload()<<"\n";
auto received = msg->get_payload(); auto received = msg->get_payload();
fc::async( [=](){ fc::async( [=](){
if( _connection ) if( _connection )
@ -504,8 +487,8 @@ namespace fc { namespace http {
_client.clear_access_channels( websocketpp::log::alevel::all ); _client.clear_access_channels( websocketpp::log::alevel::all );
_client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){ _client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){
_client_thread.async( [&](){ _client_thread.async( [&](){
wdump((msg->get_payload())); wdump( (_connection->get_host()) (msg->get_payload()) );
_connection->on_message( msg->get_payload() ); _connection->on_message( msg->get_payload() );
}).wait(); }).wait();
}); });
_client.set_close_handler( [=]( connection_hdl hdl ){ _client.set_close_handler( [=]( connection_hdl hdl ){
@ -612,7 +595,8 @@ namespace fc { namespace http {
} // namespace detail } // namespace detail
websocket_server::websocket_server():my( new detail::websocket_server_impl() ) {} websocket_server::websocket_server( std::string host_header_key )
:my( new detail::websocket_server_impl( host_header_key ) ) {}
websocket_server::~websocket_server(){} websocket_server::~websocket_server(){}
void websocket_server::on_connection( const on_connection_handler& handler ) void websocket_server::on_connection( const on_connection_handler& handler )
@ -652,7 +636,9 @@ namespace fc { namespace http {
websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password ):my( new detail::websocket_tls_server_impl(server_pem, ssl_password) ) {} websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password,
std::string host_header_key )
: my( new detail::websocket_tls_server_impl(server_pem, ssl_password, host_header_key ) ) {}
websocket_tls_server::~websocket_tls_server(){} websocket_tls_server::~websocket_tls_server(){}
void websocket_tls_server::on_connection( const on_connection_handler& handler ) void websocket_tls_server::on_connection( const on_connection_handler& handler )
@ -704,7 +690,7 @@ namespace fc { namespace http {
auto con = my->_client.get_connection( uri, ec ); auto con = my->_client.get_connection( uri, ec );
if( ec ) FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); if( ec ) FC_ASSERT( !ec, "uri: ${uri} error: ${e}", ("uri", uri) ("e",ec.message()) );
my->_client.connect(con); my->_client.connect(con);
my->_connected->wait(); my->_connected->wait();
@ -731,7 +717,7 @@ namespace fc { namespace http {
auto con = smy->_client.get_connection( uri, ec ); auto con = smy->_client.get_connection( uri, ec );
if( ec ) if( ec )
FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); FC_ASSERT( !ec, "uri: ${uri} error: ${e}", ("uri", uri) ("e",ec.message()) );
smy->_client.connect(con); smy->_client.connect(con);
smy->_connected->wait(); smy->_connected->wait();
return smy->_connection; return smy->_connection;
@ -752,7 +738,6 @@ namespace fc { namespace http {
websocket_connection_ptr websocket_tls_client::connect( const std::string& uri ) websocket_connection_ptr websocket_tls_client::connect( const std::string& uri )
{ try { { try {
// wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
my->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") ); my->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") );
@ -767,7 +752,7 @@ namespace fc { namespace http {
auto con = my->_client.get_connection( uri, ec ); auto con = my->_client.get_connection( uri, ec );
if( ec ) if( ec )
{ {
FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); FC_ASSERT( !ec, "uri: ${uri} error: ${e}", ("uri", uri) ("e",ec.message()) );
} }
my->_client.connect(con); my->_client.connect(con);
my->_connected->wait(); my->_connected->wait();

View file

@ -3,11 +3,18 @@
#include <fc/network/http/websocket.hpp> #include <fc/network/http/websocket.hpp>
#include <iostream> #include <iostream>
#include <fc/log/logger.hpp>
#include <fc/log/console_appender.hpp>
BOOST_AUTO_TEST_SUITE(fc_network) BOOST_AUTO_TEST_SUITE(fc_network)
BOOST_AUTO_TEST_CASE(websocket_test) BOOST_AUTO_TEST_CASE(websocket_test)
{ {
// set up logging
fc::shared_ptr<fc::console_appender> ca(new fc::console_appender);
fc::logger l = fc::logger::get("rpc");
l.add_appender( ca );
fc::http::websocket_client client; fc::http::websocket_client client;
fc::http::websocket_connection_ptr s_conn, c_conn; fc::http::websocket_connection_ptr s_conn, c_conn;
int port; int port;