Add descriptions for a bunch of async tasks, remove logging during thread::quit to avoid crashes for now until we cleanup thread quit code
This commit is contained in:
parent
3be05ef822
commit
9e320a3db8
8 changed files with 16 additions and 15 deletions
|
|
@ -17,7 +17,7 @@ namespace fc {
|
||||||
struct cin_buffer {
|
struct cin_buffer {
|
||||||
cin_buffer():eof(false),write_pos(0),read_pos(0),cinthread("cin"){
|
cin_buffer():eof(false),write_pos(0),read_pos(0),cinthread("cin"){
|
||||||
|
|
||||||
cinthread.async( [=](){read();} );
|
cinthread.async( [=](){read();}, "cin_buffer::read" );
|
||||||
}
|
}
|
||||||
|
|
||||||
void read() {
|
void read() {
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ namespace fc {
|
||||||
FC_ASSERT( _compression_thread );
|
FC_ASSERT( _compression_thread );
|
||||||
if( !_compression_thread->is_current() )
|
if( !_compression_thread->is_current() )
|
||||||
{
|
{
|
||||||
_compression_thread->async( [this, filename]() { compress_file( filename ); } ).wait();
|
_compression_thread->async( [this, filename]() { compress_file( filename ); }, "compress_file" ).wait();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -74,7 +74,7 @@ namespace fc {
|
||||||
if( cfg.rotation_compression )
|
if( cfg.rotation_compression )
|
||||||
_compression_thread.reset( new thread( "compression") );
|
_compression_thread.reset( new thread( "compression") );
|
||||||
|
|
||||||
_rotation_task = async( [this]() { rotate_files( true ); } );
|
_rotation_task = async( [this]() { rotate_files( true ); }, "rotate_files" );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ namespace fc { namespace http {
|
||||||
impl(){}
|
impl(){}
|
||||||
impl(const fc::ip::endpoint& p ) {
|
impl(const fc::ip::endpoint& p ) {
|
||||||
tcp_serv.listen(p);
|
tcp_serv.listen(p);
|
||||||
accept_complete = fc::async([this](){ this->accept_loop(); });
|
accept_complete = fc::async([this](){ this->accept_loop(); }, "http_server accept_loop");
|
||||||
}
|
}
|
||||||
fc::future<void> accept_complete;
|
fc::future<void> accept_complete;
|
||||||
~impl() {
|
~impl() {
|
||||||
|
|
@ -66,7 +66,7 @@ namespace fc { namespace http {
|
||||||
http::connection_ptr con = std::make_shared<http::connection>();
|
http::connection_ptr con = std::make_shared<http::connection>();
|
||||||
tcp_serv.accept( con->get_socket() );
|
tcp_serv.accept( con->get_socket() );
|
||||||
//ilog( "Accept Connection" );
|
//ilog( "Accept Connection" );
|
||||||
fc::async( [=](){ handle_connection( con, on_req ); } );
|
fc::async( [=](){ handle_connection( con, on_req ); }, "http_server handle_connection" );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -144,7 +144,7 @@ namespace fc { namespace http {
|
||||||
if( false || my->handle_next_req ) {
|
if( false || my->handle_next_req ) {
|
||||||
ilog( "handle next request..." );
|
ilog( "handle next request..." );
|
||||||
//fc::async( std::function<void()>(my->handle_next_req) );
|
//fc::async( std::function<void()>(my->handle_next_req) );
|
||||||
fc::async( my->handle_next_req );
|
fc::async( my->handle_next_req, "http_server handle_next_req" );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -160,7 +160,7 @@ namespace fc
|
||||||
{
|
{
|
||||||
wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" );
|
wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" );
|
||||||
}
|
}
|
||||||
}).wait();
|
}, "ntp_shutdown_task").wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -213,7 +213,7 @@ namespace fc
|
||||||
|
|
||||||
// launch the read processing loop it if isn't running, or signal it to resume if it's paused.
|
// launch the read processing loop it if isn't running, or signal it to resume if it's paused.
|
||||||
if (!_process_pending_reads_loop_complete.valid() || _process_pending_reads_loop_complete.ready())
|
if (!_process_pending_reads_loop_complete.valid() || _process_pending_reads_loop_complete.ready())
|
||||||
_process_pending_reads_loop_complete = async([=](){ process_pending_reads(); });
|
_process_pending_reads_loop_complete = async([=](){ process_pending_reads(); }, "process_pending_reads" );
|
||||||
else if (_new_read_operation_available_promise)
|
else if (_new_read_operation_available_promise)
|
||||||
_new_read_operation_available_promise->set_value();
|
_new_read_operation_available_promise->set_value();
|
||||||
|
|
||||||
|
|
@ -238,7 +238,7 @@ namespace fc
|
||||||
|
|
||||||
// launch the write processing loop it if isn't running, or signal it to resume if it's paused.
|
// launch the write processing loop it if isn't running, or signal it to resume if it's paused.
|
||||||
if (!_process_pending_writes_loop_complete.valid() || _process_pending_writes_loop_complete.ready())
|
if (!_process_pending_writes_loop_complete.valid() || _process_pending_writes_loop_complete.ready())
|
||||||
_process_pending_writes_loop_complete = async([=](){ process_pending_writes(); });
|
_process_pending_writes_loop_complete = async([=](){ process_pending_writes(); }, "process_pending_writes");
|
||||||
else if (_new_write_operation_available_promise)
|
else if (_new_write_operation_available_promise)
|
||||||
_new_write_operation_available_promise->set_value();
|
_new_write_operation_available_promise->set_value();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ namespace fc {
|
||||||
UDT::startup();
|
UDT::startup();
|
||||||
check_udt_errors();
|
check_udt_errors();
|
||||||
_epoll_id = UDT::epoll_create();
|
_epoll_id = UDT::epoll_create();
|
||||||
_epoll_loop = _epoll_thread.async( [=](){ poll_loop(); } );
|
_epoll_loop = _epoll_thread.async( [=](){ poll_loop(); }, "udt_poll_loop" );
|
||||||
}
|
}
|
||||||
|
|
||||||
~udt_epoll_service()
|
~udt_epoll_service()
|
||||||
|
|
|
||||||
|
|
@ -204,7 +204,7 @@ namespace fc { namespace rpc {
|
||||||
variant v = json::from_stream(*_in);
|
variant v = json::from_stream(*_in);
|
||||||
///ilog( "input: ${in}", ("in", v ) );
|
///ilog( "input: ${in}", ("in", v ) );
|
||||||
//wlog( "recv: ${line}", ("line", line) );
|
//wlog( "recv: ${line}", ("line", line) );
|
||||||
fc::async([=](){ handle_message(v.get_object()); });
|
fc::async([=](){ handle_message(v.get_object()); }, "json_connection handle_message");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch ( eof_exception& eof )
|
catch ( eof_exception& eof )
|
||||||
|
|
@ -263,7 +263,7 @@ namespace fc { namespace rpc {
|
||||||
{
|
{
|
||||||
FC_THROW_EXCEPTION( assert_exception, "start should only be called once" );
|
FC_THROW_EXCEPTION( assert_exception, "start should only be called once" );
|
||||||
}
|
}
|
||||||
return my->_done = fc::async( [=](){ my->read_loop(); } );
|
return my->_done = fc::async( [=](){ my->read_loop(); }, "json_connection read_loop" );
|
||||||
}
|
}
|
||||||
|
|
||||||
void json_connection::add_method( const fc::string& name, method m )
|
void json_connection::add_method( const fc::string& name, method m )
|
||||||
|
|
|
||||||
|
|
@ -129,7 +129,7 @@ namespace fc {
|
||||||
//if quitting from a different thread, start quit task on thread.
|
//if quitting from a different thread, start quit task on thread.
|
||||||
//If we have and know our attached boost thread, wait for it to finish, then return.
|
//If we have and know our attached boost thread, wait for it to finish, then return.
|
||||||
if( ¤t() != this ) {
|
if( ¤t() != this ) {
|
||||||
async( [=](){quit();} );//.wait();
|
async( [=](){quit();}, "thread::quit" );//.wait();
|
||||||
if( my->boost_thread ) {
|
if( my->boost_thread ) {
|
||||||
auto n = name();
|
auto n = name();
|
||||||
my->boost_thread->join();
|
my->boost_thread->join();
|
||||||
|
|
@ -139,7 +139,7 @@ namespace fc {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
wlog( "${s}", ("s",name()) );
|
//wlog( "${s}", ("s",name()) );
|
||||||
// We are quiting from our own thread...
|
// We are quiting from our own thread...
|
||||||
|
|
||||||
// break all promises, thread quit!
|
// break all promises, thread quit!
|
||||||
|
|
@ -155,7 +155,7 @@ namespace fc {
|
||||||
cur = n;
|
cur = n;
|
||||||
}
|
}
|
||||||
if( my->blocked ) {
|
if( my->blocked ) {
|
||||||
wlog( "still blocking... whats up with that?");
|
//wlog( "still blocking... whats up with that?");
|
||||||
debug( "on quit" );
|
debug( "on quit" );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -289,6 +289,7 @@ namespace fc {
|
||||||
void thread::async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ) {
|
void thread::async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ) {
|
||||||
assert(my);
|
assert(my);
|
||||||
t->_when = tp;
|
t->_when = tp;
|
||||||
|
t->_desc = desc;
|
||||||
// slog( "when %lld", t->_when.time_since_epoch().count() );
|
// slog( "when %lld", t->_when.time_since_epoch().count() );
|
||||||
// slog( "delay %lld", (tp - fc::time_point::now()).count() );
|
// slog( "delay %lld", (tp - fc::time_point::now()).count() );
|
||||||
task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
|
task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue