FC Updates from BitShares and myself #21

Closed
nathanielhourt wants to merge 687 commits from dapp-support into latest-fc
5 changed files with 121 additions and 54 deletions
Showing only changes of commit 8b4ef5aa02 - Show all commits

View file

@ -30,7 +30,7 @@ namespace fc { namespace http {
boost::any& get_session_data() { return _session_data; } boost::any& get_session_data() { return _session_data; }
virtual std::string get_request_header(const std::string& key) = 0; virtual std::string get_request_header(const std::string& key) = 0;
virtual std::string get_host() = 0; virtual std::string get_remote_hostname() = 0;
fc::signal<void()> closed; fc::signal<void()> closed;
private: private:
@ -45,7 +45,7 @@ namespace fc { namespace http {
class websocket_server class websocket_server
{ {
public: public:
websocket_server( std::string host_header_key = std::string() ); websocket_server();
~websocket_server(); ~websocket_server();
void on_connection( const on_connection_handler& handler); void on_connection( const on_connection_handler& handler);
@ -67,7 +67,7 @@ namespace fc { namespace http {
{ {
public: public:
websocket_tls_server( const std::string& server_pem = std::string(), websocket_tls_server( const std::string& server_pem = std::string(),
const std::string& ssl_password = std::string(), std::string host_header_key = std::string() ); const std::string& ssl_password = std::string());
~websocket_tls_server(); ~websocket_tls_server();
void on_connection( const on_connection_handler& handler); void on_connection( const on_connection_handler& handler);

View file

@ -141,11 +141,13 @@ namespace fc { namespace http {
class websocket_connection_impl : public websocket_connection class websocket_connection_impl : public websocket_connection
{ {
public: public:
websocket_connection_impl( T con, std::string host_header_key = std::string() ) websocket_connection_impl( T con )
:_ws_connection(con), host_header_key(host_header_key) { :_ws_connection(con){
} }
~websocket_connection_impl() {} ~websocket_connection_impl()
{
}
virtual void send_message( const std::string& message )override virtual void send_message( const std::string& message )override
{ {
@ -153,7 +155,6 @@ namespace fc { namespace http {
auto ec = _ws_connection->send( message ); auto ec = _ws_connection->send( message );
FC_ASSERT( !ec, "websocket send failed: ${msg}", ("msg",ec.message() ) ); FC_ASSERT( !ec, "websocket send failed: ${msg}", ("msg",ec.message() ) );
} }
virtual void close( int64_t code, const std::string& reason )override virtual void close( int64_t code, const std::string& reason )override
{ {
_ws_connection->close(code,reason); _ws_connection->close(code,reason);
@ -164,19 +165,14 @@ namespace fc { namespace http {
return _ws_connection->get_request_header(key); return _ws_connection->get_request_header(key);
} }
virtual std::string get_host() override virtual std::string get_remote_hostname()
{ {
if ( !host_header_key.empty() ) // TODO: check headers, revert to the raw connection details
{ // Notes: T is a pointer to a websocketpp::connection (see connection.hpp)
auto header = get_request_header( host_header_key ); return _ws_connection->get_uri()->get_host();
if ( !header.empty() )
return header;
}
return _ws_connection->get_host();
} }
private:
T _ws_connection; T _ws_connection;
std::string host_header_key;
}; };
typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr; typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr;
@ -184,7 +180,7 @@ namespace fc { namespace http {
class websocket_server_impl class websocket_server_impl
{ {
public: public:
websocket_server_impl( std::string host_header_key ) websocket_server_impl()
:_server_thread( fc::thread::current() ) :_server_thread( fc::thread::current() )
{ {
@ -193,8 +189,7 @@ namespace fc { namespace http {
_server.set_reuse_addr(true); _server.set_reuse_addr(true);
_server.set_open_handler( [&]( connection_hdl hdl ){ _server.set_open_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto new_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>> auto new_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) );
( _server.get_con_from_hdl(hdl), host_header_key );
_on_connection( _connections[hdl] = new_con ); _on_connection( _connections[hdl] = new_con );
}).wait(); }).wait();
}); });
@ -203,14 +198,10 @@ namespace fc { namespace http {
auto current_con = _connections.find(hdl); auto current_con = _connections.find(hdl);
assert( current_con != _connections.end() ); assert( current_con != _connections.end() );
auto payload = msg->get_payload(); auto payload = msg->get_payload();
std::shared_ptr<websocket_connection> ws_con = current_con->second; std::shared_ptr<websocket_connection> con = current_con->second;
wdump( ("server") (ws_con->get_host()) (payload) ); wdump( ("server") ( con->get_remote_hostname() ) (msg->get_payload()));
++_pending_messages; ++_pending_messages;
auto f = fc::async([this,ws_con,payload]() auto f = fc::async([this,con,payload](){ if( _pending_messages ) --_pending_messages; con->on_message( payload ); });
{
if( _pending_messages ) --_pending_messages;
ws_con->on_message( payload );
});
if( _pending_messages > 100 ) if( _pending_messages > 100 )
f.wait(); f.wait();
}).wait(); }).wait();
@ -223,18 +214,17 @@ namespace fc { namespace http {
_server.set_http_handler( [&]( connection_hdl hdl ){ _server.set_http_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto current_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( auto current_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) );
_server.get_con_from_hdl(hdl), host_header_key );
_on_connection( current_con ); _on_connection( current_con );
auto con = _server.get_con_from_hdl(hdl); auto con = _server.get_con_from_hdl(hdl);
con->defer_http_response(); con->defer_http_response();
std::string request_body = con->get_request_body(); std::string request_body = con->get_request_body();
wdump( ("server") (current_con->get_host()) (request_body) ); wdump(("server")(request_body));
fc::async([current_con, request_body, con] { fc::async([current_con, request_body, con] {
std::string response = current_con->on_http(request_body); std::string response = current_con->on_http(request_body);
idump( ("server") (current_con->get_host()) (response) ); idump((response));
con->set_body( response ); con->set_body( response );
con->set_status( websocketpp::http::status_code::ok ); con->set_status( websocketpp::http::status_code::ok );
con->send_http_response(); con->send_http_response();
@ -306,9 +296,10 @@ namespace fc { namespace http {
class websocket_tls_server_impl class websocket_tls_server_impl
{ {
public: public:
websocket_tls_server_impl( const string& server_pem, const string& ssl_password, websocket_tls_server_impl( const string& server_pem, const string& ssl_password )
std::string host_header_key ) :_server_thread( fc::thread::current() ) :_server_thread( fc::thread::current() )
{ {
//if( server_pem.size() )
{ {
_server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr { _server.set_tls_init_handler( [=]( websocketpp::connection_hdl hdl ) -> context_ptr {
context_ptr ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::tlsv1); context_ptr ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::tlsv1);
@ -332,8 +323,7 @@ namespace fc { namespace http {
_server.set_reuse_addr(true); _server.set_reuse_addr(true);
_server.set_open_handler( [&]( connection_hdl hdl ){ _server.set_open_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto new_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>( auto new_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) );
_server.get_con_from_hdl(hdl), host_header_key );
_on_connection( _connections[hdl] = new_con ); _on_connection( _connections[hdl] = new_con );
}).wait(); }).wait();
}); });
@ -342,29 +332,27 @@ namespace fc { namespace http {
auto current_con = _connections.find(hdl); auto current_con = _connections.find(hdl);
assert( current_con != _connections.end() ); assert( current_con != _connections.end() );
auto received = msg->get_payload(); auto received = msg->get_payload();
std::shared_ptr<websocket_connection> ws_con = current_con->second; std::shared_ptr<websocket_connection> con = current_con->second;
wdump( ("server") (ws_con->get_host()) (received) ); fc::async([con,received](){ con->on_message( received ); });
fc::async([ws_con,received](){ ws_con->on_message( received ); });
}).wait(); }).wait();
}); });
_server.set_http_handler( [&]( connection_hdl hdl ){ _server.set_http_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){ _server_thread.async( [&](){
auto current_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>( auto current_con = std::make_shared<websocket_connection_impl<websocket_tls_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) );
_server.get_con_from_hdl(hdl), host_header_key );
try{ try{
_on_connection( current_con ); _on_connection( current_con );
auto con = _server.get_con_from_hdl(hdl); auto con = _server.get_con_from_hdl(hdl);
wdump(("server") (con->get_host()) (con->get_request_body()) ); wdump(("server")(con->get_request_body()));
auto response = current_con->on_http( con->get_request_body() ); auto response = current_con->on_http( con->get_request_body() );
idump( ("server") (current_con->get_host()) (response) ); idump((response));
con->set_body( response ); con->set_body( response );
con->set_status( websocketpp::http::status_code::ok ); con->set_status( websocketpp::http::status_code::ok );
} catch ( const fc::exception& e ) } catch ( const fc::exception& e )
{ {
edump( (current_con->get_host()) (e.to_detail_string()) ); edump((e.to_detail_string()));
} }
current_con->closed(); current_con->closed();
@ -427,7 +415,8 @@ namespace fc { namespace http {
_client.clear_access_channels( websocketpp::log::alevel::all ); _client.clear_access_channels( websocketpp::log::alevel::all );
_client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){ _client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){
_client_thread.async( [&](){ _client_thread.async( [&](){
wdump( (_connection->get_host()) (msg->get_payload()) ); wdump((msg->get_payload()));
//std::cerr<<"recv: "<<msg->get_payload()<<"\n";
auto received = msg->get_payload(); auto received = msg->get_payload();
fc::async( [=](){ fc::async( [=](){
if( _connection ) if( _connection )
@ -487,8 +476,8 @@ namespace fc { namespace http {
_client.clear_access_channels( websocketpp::log::alevel::all ); _client.clear_access_channels( websocketpp::log::alevel::all );
_client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){ _client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){
_client_thread.async( [&](){ _client_thread.async( [&](){
wdump( (_connection->get_host()) (msg->get_payload()) ); wdump((msg->get_payload()));
_connection->on_message( msg->get_payload() ); _connection->on_message( msg->get_payload() );
}).wait(); }).wait();
}); });
_client.set_close_handler( [=]( connection_hdl hdl ){ _client.set_close_handler( [=]( connection_hdl hdl ){
@ -595,8 +584,7 @@ namespace fc { namespace http {
} // namespace detail } // namespace detail
websocket_server::websocket_server( std::string host_header_key ) websocket_server::websocket_server():my( new detail::websocket_server_impl() ) {}
:my( new detail::websocket_server_impl( host_header_key ) ) {}
websocket_server::~websocket_server(){} websocket_server::~websocket_server(){}
void websocket_server::on_connection( const on_connection_handler& handler ) void websocket_server::on_connection( const on_connection_handler& handler )
@ -636,9 +624,7 @@ namespace fc { namespace http {
websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password, websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password ):my( new detail::websocket_tls_server_impl(server_pem, ssl_password) ) {}
std::string host_header_key )
: my( new detail::websocket_tls_server_impl(server_pem, ssl_password, host_header_key ) ) {}
websocket_tls_server::~websocket_tls_server(){} websocket_tls_server::~websocket_tls_server(){}
void websocket_tls_server::on_connection( const on_connection_handler& handler ) void websocket_tls_server::on_connection( const on_connection_handler& handler )
@ -690,7 +676,7 @@ namespace fc { namespace http {
auto con = my->_client.get_connection( uri, ec ); auto con = my->_client.get_connection( uri, ec );
if( ec ) FC_ASSERT( !ec, "uri: ${uri} error: ${e}", ("uri", uri) ("e",ec.message()) ); if( ec ) FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
my->_client.connect(con); my->_client.connect(con);
my->_connected->wait(); my->_connected->wait();
@ -717,7 +703,7 @@ namespace fc { namespace http {
auto con = smy->_client.get_connection( uri, ec ); auto con = smy->_client.get_connection( uri, ec );
if( ec ) if( ec )
FC_ASSERT( !ec, "uri: ${uri} error: ${e}", ("uri", uri) ("e",ec.message()) ); FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
smy->_client.connect(con); smy->_client.connect(con);
smy->_connected->wait(); smy->_connected->wait();
return smy->_connection; return smy->_connection;
@ -738,6 +724,7 @@ namespace fc { namespace http {
websocket_connection_ptr websocket_tls_client::connect( const std::string& uri ) websocket_connection_ptr websocket_tls_client::connect( const std::string& uri )
{ try { { try {
// wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
my->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") ); my->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") );
@ -752,7 +739,7 @@ namespace fc { namespace http {
auto con = my->_client.get_connection( uri, ec ); auto con = my->_client.get_connection( uri, ec );
if( ec ) if( ec )
{ {
FC_ASSERT( !ec, "uri: ${uri} error: ${e}", ("uri", uri) ("e",ec.message()) ); FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
} }
my->_client.connect(con); my->_client.connect(con);
my->_connected->wait(); my->_connected->wait();

View file

@ -19,6 +19,12 @@ target_link_libraries( hmac_test fc )
add_executable( ecc_test crypto/ecc_test.cpp ) add_executable( ecc_test crypto/ecc_test.cpp )
target_link_libraries( ecc_test fc ) target_link_libraries( ecc_test fc )
add_executable( ws_test_server ws_test_server.cpp )
target_link_libraries( ws_test_server fc )
add_executable( ws_test_client ws_test_client.cpp )
target_link_libraries( ws_test_client fc )
#add_executable( test_aes aes_test.cpp ) #add_executable( test_aes aes_test.cpp )
#target_link_libraries( test_aes fc ${rt_library} ${pthread_library} ) #target_link_libraries( test_aes fc ${rt_library} ${pthread_library} )
#add_executable( test_sleep sleep.cpp ) #add_executable( test_sleep sleep.cpp )

41
tests/ws_test_client.cpp Normal file
View file

@ -0,0 +1,41 @@
#include <fc/network/http/websocket.hpp>
#include <websocketpp/error.hpp>
#include <iostream>
#include <string>
#include <fc/log/logger.hpp>
#include <fc/log/console_appender.hpp>
int main(int argc, char** argv)
{
try
{
// set up logging
fc::shared_ptr<fc::console_appender> ca(new fc::console_appender);
fc::logger l = fc::logger::get("rpc");
l.add_appender( ca );
fc::http::websocket_client client;
fc::http::websocket_connection_ptr s_conn, c_conn;
int port = std::stoi(argv[1]);
wlog( "Connecting to server at port ${port}", ("port", argv[1]) );
c_conn = client.connect( "ws://127.0.0.1:" + fc::to_string(port) );
std::string echo;
c_conn->on_message_handler([&](const std::string& s){
echo = s;
});
c_conn->send_message( "hello world" );
fc::usleep( fc::milliseconds(500) );
if (echo != std::string("echo: hello world") )
wlog( "Test1 failed, echo value: [${echo}]", ("echo", echo) );
c_conn->send_message( "again" );
fc::usleep( fc::milliseconds(500) );
if ("echo: again" != echo)
wlog( "Test2 failed, echo value: [${echo}]", ("echo", echo) );
}
catch (const websocketpp::exception& ex)
{
elog( "websocketpp::exception thrown: ${err}", ("err", ex.what()) );
}
}

33
tests/ws_test_server.cpp Normal file
View file

@ -0,0 +1,33 @@
#include <fc/network/http/websocket.hpp>
#include <fc/asio.hpp>
#include <iostream>
#include <chrono>
#include <fc/log/logger.hpp>
#include <fc/log/console_appender.hpp>
int main(int argc, char** argv)
{
// set up logging
fc::shared_ptr<fc::console_appender> ca(new fc::console_appender);
fc::logger l = fc::logger::get("rpc");
l.add_appender( ca );
fc::http::websocket_server server;
server.on_connection([&]( const fc::http::websocket_connection_ptr& c ){
c->on_message_handler([&](const std::string& s){
c->send_message("echo: " + s);
});
});
server.listen( 0 );
server.start_accept();
wlog( "Port: ${port}", ("port", server.get_listening_port()) );
while( true )
{
fc::usleep( fc::microseconds(100) );
}
}