Call fc:🧵:quit() on thread destruction, and remove code that explicitly terminates threads from the destructors of the objects that own those threads.

Fix fc::thread to set the thread name in the Debugger when the name is set after thread construction.
When terminating a thread, cancel any tasks that have been schedule()d or async()ed but have not yet started executing.
When canceling a task blocked on a fc::mutex, notify the thread to reschedule the task to allow it to clean up the mutex's block list.
Fix a bug in managing the recursive lock count when tasks block on a fc::mutex
Reorder the code that manages the hard links for log files to avoid an exception generated by unlinking a locked file on Windows.
This commit is contained in:
Eric Frias 2014-10-21 10:25:28 -04:00
parent c1eca45ef1
commit 307252e23a
7 changed files with 125 additions and 86 deletions

View file

@ -157,6 +157,7 @@ namespace fc {
void async_task( task_base* t, const priority& p, const time_point& tp );
void notify_task_has_been_canceled();
void unblock(fc::context* c);
class thread_d* my;
};

View file

@ -88,15 +88,6 @@ namespace fc {
catch( ... )
{
}
try
{
if( _compression_thread )
_compression_thread->quit();
}
catch( ... )
{
}
}
void rotate_files( bool initializing = false )
@ -124,11 +115,10 @@ namespace fc {
out.flush();
out.close();
}
remove_all(link_filename); // on windows, you can't delete the link while the underlying file is opened for writing
out.open( log_filename );
}
remove_all( link_filename );
create_hard_link( log_filename, link_filename );
create_hard_link(log_filename, link_filename);
/* Delete old log files */
fc::time_point limit_time = now - cfg.rotation_limit;
@ -160,6 +150,10 @@ namespace fc {
compress_file( *itr );
}
}
catch (const fc::canceled_exception&)
{
throw;
}
catch( ... )
{
}

View file

@ -44,7 +44,6 @@ namespace fc
~ntp_impl()
{
_ntp_thread.quit(); //TODO: this can be removed once fc::threads call quit during destruction
}
fc::time_point ntp_timestamp_to_fc_time_point(uint64_t ntp_timestamp_net_order)

View file

@ -80,6 +80,7 @@ namespace fc {
* the current context is the tail in the wait queue.
*/
bool mutex::try_lock() {
assert(false); // this is currently broken re: recursive mutexes
fc::thread* ct = &fc::thread::current();
fc::context* cc = ct->my->current;
fc::context* n = 0;
@ -97,6 +98,7 @@ namespace fc {
}
bool mutex::try_lock_until( const fc::time_point& abs_time ) {
assert(false); // this is currently broken re: recursive mutexes
fc::context* n = 0;
fc::context* cc = fc::thread::current().my->current;
@ -152,7 +154,6 @@ namespace fc {
// add ourselves to the head of the list
current_context->next_blocked_mutex = m_blist;
m_blist = current_context;
++recursive_lock_count;
#if 0
int cnt = 0;
@ -170,6 +171,8 @@ namespace fc {
fc::thread::current().yield(false);
// if yield() returned normally, we should now own the lock (we should be at the tail of the list)
BOOST_ASSERT( current_context->next_blocked_mutex == 0 );
assert(recursive_lock_count == 0);
recursive_lock_count = 1;
}
catch ( exception& e )
{

View file

@ -65,6 +65,11 @@ namespace fc {
promise_base::cancel(reason);
if (_active_context)
{
if (_active_context->next_blocked_mutex)
{
// this task is blocked on a mutex, we probably don't handle this correctly
_active_context->ctx_thread->unblock(_active_context);
}
_active_context->canceled = true;
#ifndef NDEBUG
_active_context->cancellation_reason = reason;

View file

@ -74,7 +74,7 @@ namespace fc {
promise<void>::ptr p(new promise<void>("thread start"));
boost::thread* t = new boost::thread( [this,p,name]() {
try {
set_thread_name(name.c_str()); // set thread's name for the debugger to display
set_thread_name(name.c_str()); // set thread's name for the debugger to display
this->my = new thread_d(*this);
current_thread() = this;
p->set_value();
@ -91,7 +91,7 @@ namespace fc {
} );
p->wait();
my->boost_thread = t;
set_name(name);
my->name = name;
}
thread::thread( thread_d* ) {
my = new thread_d(*this);
@ -109,20 +109,34 @@ namespace fc {
thread::~thread() {
//wlog( "my ${n}", ("n",name()) );
if( is_current() )
if( my )
{
wlog( "delete my" );
delete my;
wlog( "calling quit()" );
quit(); // deletes `my`
}
my = 0;
}
thread& thread::current() {
if( !current_thread() ) current_thread() = new thread((thread_d*)0);
if( !current_thread() )
current_thread() = new thread((thread_d*)0);
return *current_thread();
}
const string& thread::name()const { return my->name; }
void thread::set_name( const fc::string& n ) { my->name = n; }
const string& thread::name()const
{
return my->name;
}
void thread::set_name( const fc::string& n )
{
if (!is_current())
{
async([=](){ set_name(n); }, "set_name").wait();
return;
}
my->name = n;
set_thread_name(my->name.c_str()); // set thread's name for the debugger to display
}
const char* thread::current_task_desc() const
{
@ -133,74 +147,88 @@ namespace fc {
void thread::debug( const fc::string& d ) { /*my->debug(d);*/ }
void thread::quit() {
//if quitting from a different thread, start quit task on thread.
//If we have and know our attached boost thread, wait for it to finish, then return.
if( &current() != this ) {
async( [=](){quit();}, "thread::quit" );//.wait();
if( my->boost_thread ) {
auto n = name();
my->boost_thread->join();
delete my;
my = nullptr;
}
return;
void thread::quit()
{
//if quitting from a different thread, start quit task on thread.
//If we have and know our attached boost thread, wait for it to finish, then return.
if( &current() != this )
{
async( [=](){quit();}, "thread::quit" );//.wait();
if( my->boost_thread )
{
my->boost_thread->join();
delete my;
my = nullptr;
}
return;
}
// wlog( "${s}", ("s",name()) );
// We are quiting from our own thread...
my->done = true;
// wlog( "${s}", ("s",name()) );
// We are quiting from our own thread...
// break all promises, thread quit!
while( my->blocked ) {
fc::context* cur = my->blocked;
while( cur ) {
fc::context* n = cur->next;
// this will move the context into the ready list.
//cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) );
//cur->set_exception_on_blocking_promises( thread_quit() );
cur->set_exception_on_blocking_promises( std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) );
cur = n;
}
if( my->blocked ) {
//wlog( "still blocking... whats up with that?");
debug( "on quit" );
}
}
BOOST_ASSERT( my->blocked == 0 );
//my->blocked = 0;
// move all sleep tasks to ready
for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) {
my->add_context_to_ready_list( my->sleep_pqueue[i] );
}
my->sleep_pqueue.clear();
// move all idle tasks to ready
fc::context* cur = my->pt_head;
while( cur ) {
// break all promises, thread quit!
while( my->blocked )
{
fc::context* cur = my->blocked;
while( cur )
{
fc::context* n = cur->next;
cur->next = 0;
my->add_context_to_ready_list( cur );
// this will move the context into the ready list.
//cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) );
//cur->set_exception_on_blocking_promises( thread_quit() );
cur->set_exception_on_blocking_promises( std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) );
cur = n;
}
// mark all ready tasks (should be everyone)... as canceled
for (fc::context* ready_context : my->ready_heap)
ready_context->canceled = true;
my->done = true;
// now that we have poked all fibers... switch to the next one and
// let them all quit.
while (!my->ready_heap.empty())
{
my->start_next_fiber(true);
my->check_for_timeouts();
if( my->blocked )
{
//wlog( "still blocking... whats up with that?");
debug( "on quit" );
}
my->clear_free_list();
my->cleanup_thread_specific_data();
}
}
BOOST_ASSERT( my->blocked == 0 );
//my->blocked = 0;
for (task_base* unstarted_task : my->task_pqueue)
unstarted_task->set_exception(std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")));
my->task_pqueue.clear();
for (task_base* scheduled_task : my->task_sch_queue)
scheduled_task->set_exception(std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")));
my->task_sch_queue.clear();
// move all sleep tasks to ready
for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i )
my->add_context_to_ready_list( my->sleep_pqueue[i] );
my->sleep_pqueue.clear();
// move all idle tasks to ready
fc::context* cur = my->pt_head;
while( cur )
{
fc::context* n = cur->next;
cur->next = 0;
my->add_context_to_ready_list( cur );
cur = n;
}
// mark all ready tasks (should be everyone)... as canceled
for (fc::context* ready_context : my->ready_heap)
ready_context->canceled = true;
// now that we have poked all fibers... switch to the next one and
// let them all quit.
while (!my->ready_heap.empty())
{
my->start_next_fiber(true);
my->check_for_timeouts();
}
my->clear_free_list();
my->cleanup_thread_specific_data();
}
void thread::exec()
{
@ -450,6 +478,12 @@ namespace fc {
async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() );
}
void thread::unblock(fc::context* c)
{
my->unblock(c);
}
#ifdef _MSC_VER
/* support for providing a structured exception handler for async tasks */
namespace detail

View file

@ -332,13 +332,16 @@ namespace fc {
*/
void check_fiber_exceptions()
{
if( current && current->canceled ) {
if( current && current->canceled )
{
#ifdef NDEBUG
FC_THROW_EXCEPTION( canceled_exception, "" );
#else
FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: ${reason}", ("reason", current->cancellation_reason ? current->cancellation_reason : "[none given]"));
#endif
} else if( done ) {
}
else if( done )
{
ilog( "throwing canceled exception" );
FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: thread quitting" );
// BOOST_THROW_EXCEPTION( thread_quit() );