Refactor websocket_client, fix duplicate code

Fix duplicate code issue in websocket_client::connect() and
websocket_client::secure_connect(), and other minor issues
This commit is contained in:
abitmore 2020-05-02 11:17:07 -04:00
parent c00824a137
commit bee73bd491
2 changed files with 62 additions and 59 deletions

View file

@ -101,7 +101,7 @@ namespace fc { namespace http {
private:
std::unique_ptr<detail::websocket_client_impl> my;
std::unique_ptr<detail::websocket_tls_client_impl> smy;
std::vector<std::pair<std::string,std::string>> headers;
fc::http::headers _headers;
};
} }

View file

@ -499,16 +499,32 @@ namespace fc { namespace http {
}).wait();
});
_client.set_close_handler( [=]( connection_hdl hdl ){
_client_thread.async( [&](){ if( _connection ) {_connection->closed(); _connection.reset();} } ).wait();
if( _closed ) _closed->set_value();
_client_thread.async( [&](){
if( _connection ) {
_connection->closed();
_connection.reset();
}
} ).wait();
if( _closed )
_closed->set_value();
});
_client.set_fail_handler( [=]( connection_hdl hdl ){
auto con = _client.get_con_from_hdl(hdl);
auto message = con->get_ec().message();
if( _connection )
_client_thread.async( [&](){ if( _connection ) _connection->closed(); _connection.reset(); } ).wait();
{
_client_thread.async( [&](){
if( _connection ) {
_connection->closed();
_connection.reset();
}
} ).wait();
}
if( _connected && !_connected->ready() )
_connected->set_exception( exception_ptr( new FC_EXCEPTION( exception, "${message}", ("message",message)) ) );
{
_connected->set_exception( exception_ptr(
new FC_EXCEPTION( exception, "${message}", ("message",message)) ) );
}
if( _closed )
_closed->set_value();
});
@ -525,6 +541,37 @@ namespace fc { namespace http {
if( _closed )
_closed->wait();
}
websocket_connection_ptr connect( const std::string& uri, const fc::http::headers& headers )
{
websocketpp::lib::error_code ec;
_uri = uri;
_connected = promise<void>::create("websocket::connect");
_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
_hdl = hdl;
auto con = _client.get_con_from_hdl(hdl);
_connection = std::make_shared<websocket_connection_impl<
typename websocketpp::client<T>::connection_ptr>>( con );
_closed = promise<void>::create("websocket::closed");
_connected->set_value();
});
auto con = _client.get_connection( uri, ec );
for( const fc::http::header& h : headers )
{
con->append_header( h.key, h.val );
}
FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
_client.connect(con);
_connected->wait();
return _connection;
}
fc::promise<void>::ptr _connected;
fc::promise<void>::ptr _closed;
fc::thread& _client_thread;
@ -685,66 +732,22 @@ namespace fc { namespace http {
websocket_connection_ptr websocket_client::connect( const std::string& uri )
{ try {
if( uri.substr(0,4) == "wss:" )
return secure_connect(uri);
FC_ASSERT( uri.substr(0,3) == "ws:" );
websocketpp::lib::error_code ec;
FC_ASSERT( uri.substr(0,4) == "wss:" || uri.substr(0,3) == "ws:", "Unsupported protocol" );
my->_uri = uri;
my->_connected = promise<void>::create("websocket::connect");
// WSS
if( uri.substr(0,4) == "wss:" )
return smy->connect( uri, _headers );
my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
my->_hdl = hdl;
auto con = my->_client.get_con_from_hdl(hdl);
my->_connection = std::make_shared<detail::websocket_connection_impl<
detail::websocket_client_connection_type>>( con );
my->_closed = promise<void>::create("websocket::closed");
my->_connected->set_value();
});
// WS
return my->connect( uri, _headers );
auto con = my->_client.get_connection( uri, ec );
std::for_each(headers.begin(), headers.end(), [con](std::pair<std::string, std::string> in) {
con->append_header(in.first, in.second);
});
if( ec ) FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
my->_client.connect(con);
my->_connected->wait();
return my->_connection;
} FC_CAPTURE_AND_RETHROW( (uri) ) }
// TODO most code in this function is same as ::connect, best refactor
websocket_connection_ptr websocket_client::secure_connect( const std::string& uri )
{ try {
if( uri.substr(0,3) == "ws:" )
return connect(uri);
FC_ASSERT( uri.substr(0,4) == "wss:" );
websocketpp::lib::error_code ec;
smy->_uri = uri;
smy->_connected = promise<void>::create("websocket::connect");
smy->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
smy->_hdl = hdl;
auto con = smy->_client.get_con_from_hdl(hdl);
smy->_connection = std::make_shared<detail::websocket_connection_impl<
detail::websocket_tls_client_connection_type>>( con );
smy->_closed = promise<void>::create("websocket::closed");
smy->_connected->set_value();
});
auto con = smy->_client.get_connection( uri, ec );
std::for_each(headers.begin(), headers.end(), [con](std::pair<std::string, std::string> in) {
con->append_header(in.first, in.second);
});
if( ec )
FC_ASSERT( !ec, "error: ${e}", ("e",ec.message()) );
smy->_client.connect(con);
smy->_connected->wait();
return smy->_connection;
} FC_CAPTURE_AND_RETHROW( (uri) ) }
{
return connect( uri );
}
void websocket_client::close()
{
@ -761,7 +764,7 @@ namespace fc { namespace http {
void websocket_client::append_header(const std::string& key, const std::string& value)
{
headers.push_back( std::make_pair(key, value) );
_headers.emplace_back( key, value );
}
} } // fc::http