Allow configurable proxy header for logging

This commit is contained in:
John Jones 2019-05-24 10:41:21 -05:00
parent d821e865ed
commit 4906d8b2da
5 changed files with 90 additions and 15 deletions

View file

@ -91,9 +91,11 @@ namespace fc { namespace http {
void close(); void close();
void synchronous_close(); void synchronous_close();
void append_header(const std::string& key, const std::string& value);
private: private:
std::unique_ptr<detail::websocket_client_impl> my; std::unique_ptr<detail::websocket_client_impl> my;
std::unique_ptr<detail::websocket_tls_client_impl> smy; std::unique_ptr<detail::websocket_tls_client_impl> smy;
std::vector<std::pair<std::string,std::string>> headers;
}; };
class websocket_tls_client class websocket_tls_client
{ {

View file

@ -99,8 +99,12 @@ namespace fc {
void logger::add_appender( const fc::shared_ptr<appender>& a ) void logger::add_appender( const fc::shared_ptr<appender>& a )
{ my->_appenders.push_back(a); } { my->_appenders.push_back(a); }
// void logger::remove_appender( const fc::shared_ptr<appender>& a ) void logger::remove_appender( const fc::shared_ptr<fc::appender>& a )
// { my->_appenders.erase(a); } {
auto item = std::find(my->_appenders.begin(), my->_appenders.end(), a);
if (item != my->_appenders.end())
my->_appenders.erase(item);
}
std::vector<fc::shared_ptr<appender> > logger::get_appenders()const std::vector<fc::shared_ptr<appender> > logger::get_appenders()const
{ {

View file

@ -165,9 +165,14 @@ namespace fc { namespace http {
return _ws_connection->get_request_header(key); return _ws_connection->get_request_header(key);
} }
/****
* @brief retrieves the remote hostname
*
* @param forward_header_key the key to look at in the request header
* @returns the value in the header, otherwise the remote endpoint
*/
virtual std::string get_remote_hostname(const std::string& forward_header_key) virtual std::string get_remote_hostname(const std::string& forward_header_key)
{ {
// TODO: check headers, revert to the raw connection details
if (!forward_header_key.empty()) if (!forward_header_key.empty())
{ {
std::string header_value = _ws_connection->get_request_header(forward_header_key); std::string header_value = _ws_connection->get_request_header(forward_header_key);
@ -186,9 +191,8 @@ namespace fc { namespace http {
{ {
public: public:
websocket_server_impl(const std::string& forward_header_key = std::string() ) websocket_server_impl(const std::string& forward_header_key = std::string() )
:_server_thread( fc::thread::current() ) :_server_thread( fc::thread::current() ), fwd_header_key(forward_header_key)
{ {
_server.clear_access_channels( websocketpp::log::alevel::all ); _server.clear_access_channels( websocketpp::log::alevel::all );
_server.init_asio(&fc::asio::default_io_service()); _server.init_asio(&fc::asio::default_io_service());
_server.set_reuse_addr(true); _server.set_reuse_addr(true);
@ -205,7 +209,7 @@ namespace fc { namespace http {
auto payload = msg->get_payload(); auto payload = msg->get_payload();
std::shared_ptr<websocket_connection> con = current_con->second; std::shared_ptr<websocket_connection> con = current_con->second;
wlog( "Websocket Server Remote: ${host} Payload: ${body}", wlog( "Websocket Server Remote: ${host} Payload: ${body}",
("host", con->get_remote_hostname(forward_header_key)) ("body", msg->get_payload())); ("host", con->get_remote_hostname(fwd_header_key)) ("body", msg->get_payload()));
++_pending_messages; ++_pending_messages;
auto f = fc::async([this,con,payload](){ if( _pending_messages ) --_pending_messages; con->on_message( payload ); }); auto f = fc::async([this,con,payload](){ if( _pending_messages ) --_pending_messages; con->on_message( payload ); });
if( _pending_messages > 100 ) if( _pending_messages > 100 )
@ -297,6 +301,7 @@ namespace fc { namespace http {
on_connection_handler _on_connection; on_connection_handler _on_connection;
fc::promise<void>::ptr _closed; fc::promise<void>::ptr _closed;
uint32_t _pending_messages = 0; uint32_t _pending_messages = 0;
std::string fwd_header_key;
}; };
class websocket_tls_server_impl class websocket_tls_server_impl
@ -422,7 +427,6 @@ namespace fc { namespace http {
_client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){ _client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){
_client_thread.async( [&](){ _client_thread.async( [&](){
wdump((msg->get_payload())); wdump((msg->get_payload()));
//std::cerr<<"recv: "<<msg->get_payload()<<"\n";
auto received = msg->get_payload(); auto received = msg->get_payload();
fc::async( [=](){ fc::async( [=](){
if( _connection ) if( _connection )
@ -667,7 +671,6 @@ namespace fc { namespace http {
return secure_connect(uri); return secure_connect(uri);
FC_ASSERT( uri.substr(0,3) == "ws:" ); FC_ASSERT( uri.substr(0,3) == "ws:" );
// wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
my->_uri = uri; my->_uri = uri;
@ -683,6 +686,9 @@ namespace fc { namespace http {
auto con = my->_client.get_connection( uri, ec ); auto con = my->_client.get_connection( uri, ec );
std::for_each(headers.begin(), headers.end(), [con](std::pair<std::string, std::string> in) {
con->append_header(in.first, in.second);
});
if( ec ) FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) ); if( ec ) FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
my->_client.connect(con); my->_client.connect(con);
@ -695,7 +701,6 @@ namespace fc { namespace http {
if( uri.substr(0,3) == "ws:" ) if( uri.substr(0,3) == "ws:" )
return connect(uri); return connect(uri);
FC_ASSERT( uri.substr(0,4) == "wss:" ); FC_ASSERT( uri.substr(0,4) == "wss:" );
// wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
smy->_uri = uri; smy->_uri = uri;
@ -729,6 +734,11 @@ namespace fc { namespace http {
my->_closed->wait(); my->_closed->wait();
} }
void websocket_client::append_header(const std::string& key, const std::string& value)
{
headers.push_back( std::pair<std::string,std::string>(key, value));
}
websocket_connection_ptr websocket_tls_client::connect( const std::string& uri ) websocket_connection_ptr websocket_tls_client::connect( const std::string& uri )
{ try { { try {
// wlog( "connecting to ${uri}", ("uri",uri)); // wlog( "connecting to ${uri}", ("uri",uri));

View file

@ -59,6 +59,65 @@ BOOST_AUTO_TEST_CASE(websocket_test)
BOOST_CHECK_THROW(c_conn->send_message( "again" ), fc::assert_exception); BOOST_CHECK_THROW(c_conn->send_message( "again" ), fc::assert_exception);
BOOST_CHECK_THROW(client.connect( "ws://localhost:" + fc::to_string(port) ), fc::exception); BOOST_CHECK_THROW(client.connect( "ws://localhost:" + fc::to_string(port) ), fc::exception);
l.remove_appender(ca);
}
BOOST_AUTO_TEST_CASE(websocket_test_with_proxy_header)
{
// set up logging
fc::shared_ptr<fc::console_appender> ca(new fc::console_appender);
fc::logger l = fc::logger::get("rpc");
l.add_appender( ca );
fc::http::websocket_client client;
// add the proxy header element
client.append_header("MyProxyHeaderKey", "MyServer:8080");
fc::http::websocket_connection_ptr s_conn, c_conn;
int port;
{
// the server will be on the lookout for the key in the header
fc::http::websocket_server server("MyProxyHeaderKey");
server.on_connection([&]( const fc::http::websocket_connection_ptr& c ){
s_conn = c;
c->on_message_handler([&](const std::string& s){
c->send_message("echo: " + s);
});
});
server.listen( 0 );
port = server.get_listening_port();
server.start_accept();
std::string echo;
c_conn = client.connect( "ws://localhost:" + fc::to_string(port) );
c_conn->on_message_handler([&](const std::string& s){
echo = s;
});
c_conn->send_message( "hello world" );
fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_EQUAL("echo: hello world", echo);
c_conn->send_message( "again" );
fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_EQUAL("echo: again", echo);
s_conn->close(0, "test");
fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_THROW(c_conn->send_message( "again" ), fc::exception);
c_conn = client.connect( "ws://localhost:" + fc::to_string(port) );
c_conn->on_message_handler([&](const std::string& s){
echo = s;
});
c_conn->send_message( "hello world" );
fc::usleep( fc::milliseconds(100) );
BOOST_CHECK_EQUAL("echo: hello world", echo);
}
BOOST_CHECK_THROW(c_conn->send_message( "again" ), fc::assert_exception);
BOOST_CHECK_THROW(client.connect( "ws://localhost:" + fc::to_string(port) ), fc::exception);
l.remove_appender(ca);
} }
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

View file

@ -13,7 +13,7 @@ int main(int argc, char** argv)
fc::logger l = fc::logger::get("rpc"); fc::logger l = fc::logger::get("rpc");
l.add_appender( ca ); l.add_appender( ca );
fc::http::websocket_server server; fc::http::websocket_server server("MyForwardHeaderKey");
server.on_connection([&]( const fc::http::websocket_connection_ptr& c ){ server.on_connection([&]( const fc::http::websocket_connection_ptr& c ){
c->on_message_handler([&](const std::string& s){ c->on_message_handler([&](const std::string& s){