Add promise to wait for closure of server socket

This may fix the issues that a websocket server may crash or hang when quitting
This commit is contained in:
abitmore 2020-06-20 11:36:21 -04:00
parent 9ae294293f
commit aa671d61b5

View file

@ -322,66 +322,82 @@ namespace fc { namespace http {
{ {
_connections[hdl]->closed(); _connections[hdl]->closed();
_connections.erase( hdl ); _connections.erase( hdl );
if( _connections.empty() && _all_connections_closed )
_all_connections_closed->set_value();
} }
else else
{ {
wlog( "unknown connection closed" ); wlog( "unknown connection closed" );
} }
if( _connections.empty() && _closed )
_closed->set_value();
}).wait(); }).wait();
}); });
_server.set_fail_handler( [this]( connection_hdl hdl ){ _server.set_fail_handler( [this]( connection_hdl hdl ){
if( _server.is_listening() ) _server_thread.async( [this,hdl](){
{ if( _connections.find(hdl) != _connections.end() )
_server_thread.async( [this,hdl](){ {
if( _connections.find(hdl) != _connections.end() ) _connections[hdl]->closed();
{ _connections.erase( hdl );
_connections[hdl]->closed(); if( _connections.empty() && _all_connections_closed )
_connections.erase( hdl ); _all_connections_closed->set_value();
} }
else
{
// if the server is shutting down, assume this hdl is the server socket
if( _server_socket_closed )
_server_socket_closed->set_value();
else else
{
wlog( "unknown connection failed" ); wlog( "unknown connection failed" );
} }
if( _connections.empty() && _closed ) }).wait();
_closed->set_value();
}).wait();
}
}); });
} }
virtual ~generic_websocket_server_impl() virtual ~generic_websocket_server_impl()
{ {
if( _server.is_listening() ) if( _server.is_listening() )
{
// _server.stop_listening() may trigger the on_fail callback function (the lambda function set by
// _server.set_fail_handler(...) ) for the listening server socket (note: the connection handle
// associated with the server socket is not in our connection map),
// the on_fail callback function may fire (async) a new task which may run really late
// and it will try to access the member variables of this server object,
// so we need to wait for it before destructing this object.
_server_socket_closed = promise<void>::create();
_server.stop_listening(); _server.stop_listening();
}
// Note: since _connections can be modified by lambda functions in set_*_handler, which are running // Note: since _connections can be modified by lambda functions in set_*_handler, which are running
// in another thread, perhaps we need to wait for them (especially the one in set_open_handler) // in other tasks, perhaps we need to wait for them (especially the one in set_open_handler)
// being processed. Otherwise `_closed.wait()` may hang. // being processed. Otherwise `_all_connections_closed.wait()` may hang.
if( _connections.size() ) if( !_connections.empty() )
{ {
_closed = promise<void>::create(); _all_connections_closed = promise<void>::create();
auto cpy_con = _connections; auto cpy_con = _connections;
websocketpp::lib::error_code ec;
for( auto& item : cpy_con ) for( auto& item : cpy_con )
_server.close( item.first, 0, "server exit" ); _server.close( item.first, 0, "server exit", ec );
_closed->wait(); _all_connections_closed->wait();
} }
if( _server_socket_closed )
_server_socket_closed->wait();
} }
typedef std::map<connection_hdl, websocket_connection_ptr, std::owner_less<connection_hdl> > con_map; typedef std::map<connection_hdl, websocket_connection_ptr, std::owner_less<connection_hdl> > con_map;
con_map _connections; // Note: std::map is not thread-safe nor task-safe, we may need // Note: std::map is not thread-safe nor task-safe, we may need
// to use a mutex or similar to avoid concurrent access. // to use a mutex or similar to avoid concurrent access.
fc::thread& _server_thread; con_map _connections; ///< Holds accepted connections
websocketpp::server<T> _server; fc::thread& _server_thread; ///< The thread that runs the server
on_connection_handler _on_connection; websocketpp::server<T> _server; ///< The server
fc::promise<void>::ptr _closed; on_connection_handler _on_connection; ///< A handler to be called when a new connection is accepted
uint32_t _pending_messages = 0; fc::promise<void>::ptr _all_connections_closed; ///< Promise to wait for all connections to be closed
std::string _forward_header_key; // A header like "X-Forwarded-For" (XFF), with data IP:port fc::promise<void>::ptr _server_socket_closed; ///< Promise to wait for the server socket to be closed
uint32_t _pending_messages = 0; ///< Number of messages not processed, for rate limiting
std::string _forward_header_key; ///< A header like "X-Forwarded-For" (XFF) with data IP:port
}; };
class websocket_server_impl : public generic_websocket_server_impl<asio_with_stub_log> class websocket_server_impl : public generic_websocket_server_impl<asio_with_stub_log>
@ -649,8 +665,9 @@ namespace fc { namespace http {
void websocket_server::close() void websocket_server::close()
{ {
websocketpp::lib::error_code ec;
for( auto& connection : my->_connections ) for( auto& connection : my->_connections )
my->_server.close( connection.first, websocketpp::close::status::normal, "Goodbye" ); my->_server.close( connection.first, websocketpp::close::status::normal, "Goodbye", ec );
} }
websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password, websocket_tls_server::websocket_tls_server( const string& server_pem, const string& ssl_password,