Updates from BitShares FC #22
1 changed files with 95 additions and 95 deletions
|
|
@ -243,112 +243,112 @@ namespace fc { namespace http {
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
generic_websocket_server_impl( const std::string& forward_header_key )
|
generic_websocket_server_impl( const std::string& forward_header_key )
|
||||||
:_server_thread( fc::thread::current() ), _forward_header_key(forward_header_key)
|
:_server_thread( fc::thread::current() ), _forward_header_key( forward_header_key )
|
||||||
{
|
{
|
||||||
_server.clear_access_channels( websocketpp::log::alevel::all );
|
_server.clear_access_channels( websocketpp::log::alevel::all );
|
||||||
_server.init_asio(&fc::asio::default_io_service());
|
_server.init_asio( &fc::asio::default_io_service() );
|
||||||
_server.set_reuse_addr(true);
|
_server.set_reuse_addr( true );
|
||||||
_server.set_open_handler( [this]( connection_hdl hdl ){
|
_server.set_open_handler( [this]( connection_hdl hdl ){
|
||||||
_server_thread.async( [this, hdl](){
|
_server_thread.async( [this, hdl](){
|
||||||
auto new_con = std::make_shared<possibly_proxied_websocket_connection<
|
auto new_con = std::make_shared<possibly_proxied_websocket_connection<
|
||||||
typename websocketpp::server<T>::connection_ptr>>( _server.get_con_from_hdl(hdl),
|
typename websocketpp::server<T>::connection_ptr>>( _server.get_con_from_hdl(hdl),
|
||||||
_forward_header_key );
|
_forward_header_key );
|
||||||
_on_connection( _connections[hdl] = new_con );
|
_on_connection( _connections[hdl] = new_con );
|
||||||
}).wait();
|
}).wait();
|
||||||
});
|
});
|
||||||
_server.set_message_handler( [this]( connection_hdl hdl,
|
_server.set_message_handler( [this]( connection_hdl hdl,
|
||||||
typename websocketpp::server<T>::message_ptr msg ){
|
typename websocketpp::server<T>::message_ptr msg ){
|
||||||
_server_thread.async( [this,hdl,msg](){
|
_server_thread.async( [this,hdl,msg](){
|
||||||
auto current_con = _connections.find(hdl);
|
auto current_con = _connections.find(hdl);
|
||||||
if( current_con == _connections.end() )
|
if( current_con == _connections.end() )
|
||||||
return;
|
return;
|
||||||
auto payload = msg->get_payload();
|
auto payload = msg->get_payload();
|
||||||
std::shared_ptr<websocket_connection> con = current_con->second;
|
std::shared_ptr<websocket_connection> con = current_con->second;
|
||||||
wlog( "[IN] ${remote_endpoint} ${msg}",
|
wlog( "[IN] ${remote_endpoint} ${msg}",
|
||||||
("remote_endpoint",con->get_remote_endpoint_string()) ("msg",payload) );
|
("remote_endpoint",con->get_remote_endpoint_string()) ("msg",payload) );
|
||||||
++_pending_messages;
|
++_pending_messages;
|
||||||
auto f = fc::async([this,con,payload](){
|
auto f = fc::async([this,con,payload](){
|
||||||
if( _pending_messages )
|
if( _pending_messages )
|
||||||
--_pending_messages;
|
--_pending_messages;
|
||||||
con->on_message( payload );
|
con->on_message( payload );
|
||||||
});
|
});
|
||||||
if( _pending_messages > 100 )
|
if( _pending_messages > 100 )
|
||||||
f.wait(); // Note: this is a bit strange, because it forces the server to process all
|
f.wait(); // Note: this is a bit strange, because it forces the server to process all
|
||||||
// 100 pending messages (assuming this message is the last one) before
|
// 100 pending messages (assuming this message is the last one) before
|
||||||
// trying to accept a new message.
|
// trying to accept a new message.
|
||||||
// Ideally the `wait` should be canceled immediately when the number of
|
// Ideally the `wait` should be canceled immediately when the number of
|
||||||
// pending messages falls below 100. That said, wait on the whole queue,
|
// pending messages falls below 100. That said, wait on the whole queue,
|
||||||
// but not wait on one message.
|
// but not wait on one message.
|
||||||
}).wait();
|
}).wait();
|
||||||
});
|
});
|
||||||
|
|
||||||
_server.set_socket_init_handler( [this]( websocketpp::connection_hdl hdl,
|
_server.set_socket_init_handler( [this]( websocketpp::connection_hdl hdl,
|
||||||
typename websocketpp::server<T>::connection_type::socket_type& s ) {
|
typename websocketpp::server<T>::connection_type::socket_type& s ) {
|
||||||
boost::asio::ip::tcp::no_delay option(true);
|
boost::asio::ip::tcp::no_delay option(true);
|
||||||
s.lowest_layer().set_option(option);
|
s.lowest_layer().set_option(option);
|
||||||
} );
|
} );
|
||||||
|
|
||||||
_server.set_http_handler( [this]( connection_hdl hdl ){
|
_server.set_http_handler( [this]( connection_hdl hdl ){
|
||||||
_server_thread.async( [this,hdl](){
|
_server_thread.async( [this,hdl](){
|
||||||
auto con = _server.get_con_from_hdl(hdl);
|
auto con = _server.get_con_from_hdl(hdl);
|
||||||
auto current_con = std::make_shared<possibly_proxied_websocket_connection<
|
auto current_con = std::make_shared<possibly_proxied_websocket_connection<
|
||||||
typename websocketpp::server<T>::connection_ptr>>( con, _forward_header_key );
|
typename websocketpp::server<T>::connection_ptr>>( con, _forward_header_key );
|
||||||
_on_connection( current_con );
|
_on_connection( current_con );
|
||||||
|
|
||||||
con->defer_http_response(); // Note: this can tie up resources if send_http_response() is not
|
con->defer_http_response(); // Note: this can tie up resources if send_http_response() is not
|
||||||
// called quickly enough
|
// called quickly enough
|
||||||
std::string remote_endpoint = current_con->get_remote_endpoint_string();
|
std::string remote_endpoint = current_con->get_remote_endpoint_string();
|
||||||
std::string request_body = con->get_request_body();
|
std::string request_body = con->get_request_body();
|
||||||
wlog( "[HTTP-IN] ${remote_endpoint} ${msg}",
|
wlog( "[HTTP-IN] ${remote_endpoint} ${msg}",
|
||||||
("remote_endpoint",remote_endpoint) ("msg",request_body) );
|
("remote_endpoint",remote_endpoint) ("msg",request_body) );
|
||||||
|
|
||||||
fc::async([current_con, request_body, con, remote_endpoint] {
|
fc::async([current_con, request_body, con, remote_endpoint] {
|
||||||
fc::http::reply response = current_con->on_http(request_body);
|
fc::http::reply response = current_con->on_http(request_body);
|
||||||
ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}",
|
ilog( "[HTTP-OUT] ${remote_endpoint} ${status} ${msg}",
|
||||||
("remote_endpoint",remote_endpoint)
|
("remote_endpoint",remote_endpoint)
|
||||||
("status",response.status)
|
("status",response.status)
|
||||||
("msg",response.body_as_string) );
|
("msg",response.body_as_string) );
|
||||||
con->set_body( std::move( response.body_as_string ) );
|
con->set_body( std::move( response.body_as_string ) );
|
||||||
con->set_status( websocketpp::http::status_code::value(response.status) );
|
con->set_status( websocketpp::http::status_code::value(response.status) );
|
||||||
con->send_http_response();
|
con->send_http_response();
|
||||||
current_con->closed();
|
current_con->closed();
|
||||||
}, "call on_http");
|
}, "call on_http");
|
||||||
}).wait();
|
}).wait();
|
||||||
});
|
});
|
||||||
|
|
||||||
_server.set_close_handler( [this]( connection_hdl hdl ){
|
_server.set_close_handler( [this]( connection_hdl hdl ){
|
||||||
_server_thread.async( [this,hdl](){
|
_server_thread.async( [this,hdl](){
|
||||||
if( _connections.find(hdl) != _connections.end() )
|
if( _connections.find(hdl) != _connections.end() )
|
||||||
{
|
{
|
||||||
_connections[hdl]->closed();
|
_connections[hdl]->closed();
|
||||||
_connections.erase( hdl );
|
_connections.erase( hdl );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
wlog( "unknown connection closed" );
|
wlog( "unknown connection closed" );
|
||||||
}
|
}
|
||||||
if( _connections.empty() && _closed )
|
if( _connections.empty() && _closed )
|
||||||
_closed->set_value();
|
_closed->set_value();
|
||||||
}).wait();
|
}).wait();
|
||||||
});
|
});
|
||||||
|
|
||||||
_server.set_fail_handler( [this]( connection_hdl hdl ){
|
_server.set_fail_handler( [this]( connection_hdl hdl ){
|
||||||
if( _server.is_listening() )
|
if( _server.is_listening() )
|
||||||
{
|
{
|
||||||
_server_thread.async( [this,hdl](){
|
_server_thread.async( [this,hdl](){
|
||||||
if( _connections.find(hdl) != _connections.end() )
|
if( _connections.find(hdl) != _connections.end() )
|
||||||
{
|
{
|
||||||
_connections[hdl]->closed();
|
_connections[hdl]->closed();
|
||||||
_connections.erase( hdl );
|
_connections.erase( hdl );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
wlog( "unknown connection failed" );
|
wlog( "unknown connection failed" );
|
||||||
}
|
}
|
||||||
if( _connections.empty() && _closed )
|
if( _connections.empty() && _closed )
|
||||||
_closed->set_value();
|
_closed->set_value();
|
||||||
}).wait();
|
}).wait();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -628,29 +628,29 @@ namespace fc { namespace http {
|
||||||
}
|
}
|
||||||
void websocket_server::listen( const fc::ip::endpoint& ep )
|
void websocket_server::listen( const fc::ip::endpoint& ep )
|
||||||
{
|
{
|
||||||
my->_server.listen( boost::asio::ip::tcp::endpoint(
|
my->_server.listen( boost::asio::ip::tcp::endpoint(
|
||||||
boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) );
|
boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) );
|
||||||
}
|
}
|
||||||
|
|
||||||
uint16_t websocket_server::get_listening_port()
|
uint16_t websocket_server::get_listening_port()
|
||||||
{
|
{
|
||||||
websocketpp::lib::asio::error_code ec;
|
websocketpp::lib::asio::error_code ec;
|
||||||
return my->_server.get_local_endpoint(ec).port();
|
return my->_server.get_local_endpoint(ec).port();
|
||||||
}
|
}
|
||||||
|
|
||||||
void websocket_server::start_accept() {
|
void websocket_server::start_accept() {
|
||||||
my->_server.start_accept();
|
my->_server.start_accept();
|
||||||
}
|
}
|
||||||
|
|
||||||
void websocket_server::stop_listening()
|
void websocket_server::stop_listening()
|
||||||
{
|
{
|
||||||
my->_server.stop_listening();
|
my->_server.stop_listening();
|
||||||
}
|
}
|
||||||
|
|
||||||
void websocket_server::close()
|
void websocket_server::close()
|
||||||
{
|
{
|
||||||
for (auto& connection : my->_connections)
|
for( auto& connection : my->_connections )
|
||||||
my->_server.close(connection.first, websocketpp::close::status::normal, "Goodbye");
|
my->_server.close( connection.first, websocketpp::close::status::normal, "Goodbye" );
|
||||||
}
|
}
|
||||||
|
|
||||||
websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password,
|
websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password,
|
||||||
|
|
@ -708,15 +708,15 @@ namespace fc { namespace http {
|
||||||
|
|
||||||
void websocket_client::close()
|
void websocket_client::close()
|
||||||
{
|
{
|
||||||
if (my->_hdl)
|
if( my->_hdl )
|
||||||
my->_client.close(*my->_hdl, websocketpp::close::status::normal, "Goodbye");
|
my->_client.close( *my->_hdl, websocketpp::close::status::normal, "Goodbye" );
|
||||||
}
|
}
|
||||||
|
|
||||||
void websocket_client::synchronous_close()
|
void websocket_client::synchronous_close()
|
||||||
{
|
{
|
||||||
close();
|
close();
|
||||||
if (my->_closed)
|
if( my->_closed )
|
||||||
my->_closed->wait();
|
my->_closed->wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
void websocket_client::append_header(const std::string& key, const std::string& value)
|
void websocket_client::append_header(const std::string& key, const std::string& value)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue