When the HTTP server asyncs a handler for a request, keep a future for that async and cancel any running asyncs when the HTTP server destructs

This commit is contained in:
Eric Frias 2014-10-17 16:18:55 -04:00
parent fadc0512a1
commit 891e880ad1

View file

@ -48,42 +48,86 @@ namespace fc { namespace http {
{ {
public: public:
impl(){} impl(){}
impl(const fc::ip::endpoint& p ) {
impl(const fc::ip::endpoint& p )
{
tcp_serv.set_reuse_address(); tcp_serv.set_reuse_address();
tcp_serv.listen(p); tcp_serv.listen(p);
accept_complete = fc::async([this](){ this->accept_loop(); }, "http_server accept_loop"); accept_complete = fc::async([this](){ this->accept_loop(); }, "http_server accept_loop");
} }
fc::future<void> accept_complete;
~impl() { ~impl()
try { {
try
{
tcp_serv.close(); tcp_serv.close();
if( accept_complete.valid() ) if (accept_complete.valid())
accept_complete.wait(); accept_complete.wait();
}catch(...){} }
catch (...)
{
}
for (fc::future<void>& request_in_progress : requests_in_progress)
{
try
{
request_in_progress.cancel_and_wait();
}
catch (const fc::exception& e)
{
wlog("Caught exception while canceling http request task: ${error}", ("error", e));
}
catch (const std::exception& e)
{
wlog("Caught exception while canceling http request task: ${error}", ("error", e.what()));
}
catch (...)
{
wlog("Caught unknown exception while canceling http request task");
}
}
requests_in_progress.clear();
} }
void accept_loop() {
while( !accept_complete.canceled() ) void accept_loop()
{ {
http::connection_ptr con = std::make_shared<http::connection>(); while( !accept_complete.canceled() )
tcp_serv.accept( con->get_socket() ); {
//ilog( "Accept Connection" ); http::connection_ptr con = std::make_shared<http::connection>();
fc::async( [=](){ handle_connection( con, on_req ); }, "http_server handle_connection" ); tcp_serv.accept( con->get_socket() );
} //ilog( "Accept Connection" );
// clean up futures for any completed requests
for (auto iter = requests_in_progress.begin(); iter != requests_in_progress.end();)
if (!iter->valid() || iter->ready())
iter = requests_in_progress.erase(iter);
else
++iter;
requests_in_progress.emplace_back(fc::async([=](){ handle_connection(con, on_req); }, "http_server handle_connection"));
}
} }
void handle_connection( const http::connection_ptr& c, void handle_connection( const http::connection_ptr& c,
std::function<void(const http::request&, const server::response& s )> do_on_req ) { std::function<void(const http::request&, const server::response& s )> do_on_req )
try { {
http::server::response rep( fc::shared_ptr<response::impl>( new response::impl(c) ) ); try
auto req = c->read_request(); {
if( do_on_req ) do_on_req( req, rep ); http::server::response rep( fc::shared_ptr<response::impl>( new response::impl(c) ) );
c->get_socket().close(); request req = c->read_request();
} catch ( fc::exception& e ) { if( do_on_req )
wlog( "unable to read request ${1}", ("1", e.to_detail_string() ) );//fc::except_str().c_str()); do_on_req( req, rep );
} c->get_socket().close();
//wlog( "done handle connection" ); }
catch ( fc::exception& e )
{
wlog( "unable to read request ${1}", ("1", e.to_detail_string() ) );//fc::except_str().c_str());
}
//wlog( "done handle connection" );
} }
std::function<void(const http::request&, const server::response& s )> on_req;
fc::future<void> accept_complete;
std::function<void(const http::request&, const server::response& s)> on_req;
std::vector<fc::future<void> > requests_in_progress;
fc::tcp_server tcp_serv; fc::tcp_server tcp_serv;
}; };