diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index 0f69900..2aceb5e 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -157,6 +157,7 @@ namespace fc { void async_task( task_base* t, const priority& p, const time_point& tp ); void notify_task_has_been_canceled(); + void unblock(fc::context* c); class thread_d* my; }; diff --git a/src/log/file_appender.cpp b/src/log/file_appender.cpp index feed0cb..283ebf9 100644 --- a/src/log/file_appender.cpp +++ b/src/log/file_appender.cpp @@ -88,15 +88,6 @@ namespace fc { catch( ... ) { } - - try - { - if( _compression_thread ) - _compression_thread->quit(); - } - catch( ... ) - { - } } void rotate_files( bool initializing = false ) @@ -124,11 +115,10 @@ namespace fc { out.flush(); out.close(); } - + remove_all(link_filename); // on windows, you can't delete the link while the underlying file is opened for writing out.open( log_filename ); } - remove_all( link_filename ); - create_hard_link( log_filename, link_filename ); + create_hard_link(log_filename, link_filename); /* Delete old log files */ fc::time_point limit_time = now - cfg.rotation_limit; @@ -160,6 +150,10 @@ namespace fc { compress_file( *itr ); } } + catch (const fc::canceled_exception&) + { + throw; + } catch( ... ) { } diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index bd7d43a..2122d93 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -44,7 +44,6 @@ namespace fc ~ntp_impl() { - _ntp_thread.quit(); //TODO: this can be removed once fc::threads call quit during destruction } fc::time_point ntp_timestamp_to_fc_time_point(uint64_t ntp_timestamp_net_order) diff --git a/src/thread/mutex.cpp b/src/thread/mutex.cpp index 1dac1fb..0ebd834 100644 --- a/src/thread/mutex.cpp +++ b/src/thread/mutex.cpp @@ -80,6 +80,7 @@ namespace fc { * the current context is the tail in the wait queue. */ bool mutex::try_lock() { + assert(false); // this is currently broken re: recursive mutexes fc::thread* ct = &fc::thread::current(); fc::context* cc = ct->my->current; fc::context* n = 0; @@ -97,6 +98,7 @@ namespace fc { } bool mutex::try_lock_until( const fc::time_point& abs_time ) { + assert(false); // this is currently broken re: recursive mutexes fc::context* n = 0; fc::context* cc = fc::thread::current().my->current; @@ -152,7 +154,6 @@ namespace fc { // add ourselves to the head of the list current_context->next_blocked_mutex = m_blist; m_blist = current_context; - ++recursive_lock_count; #if 0 int cnt = 0; @@ -170,6 +171,8 @@ namespace fc { fc::thread::current().yield(false); // if yield() returned normally, we should now own the lock (we should be at the tail of the list) BOOST_ASSERT( current_context->next_blocked_mutex == 0 ); + assert(recursive_lock_count == 0); + recursive_lock_count = 1; } catch ( exception& e ) { diff --git a/src/thread/task.cpp b/src/thread/task.cpp index 9f6b6f0..7f45522 100644 --- a/src/thread/task.cpp +++ b/src/thread/task.cpp @@ -65,6 +65,11 @@ namespace fc { promise_base::cancel(reason); if (_active_context) { + if (_active_context->next_blocked_mutex) + { + // this task is blocked on a mutex, we probably don't handle this correctly + _active_context->ctx_thread->unblock(_active_context); + } _active_context->canceled = true; #ifndef NDEBUG _active_context->cancellation_reason = reason; diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index d93bba4..69de9f7 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -74,7 +74,7 @@ namespace fc { promise::ptr p(new promise("thread start")); boost::thread* t = new boost::thread( [this,p,name]() { try { - set_thread_name(name.c_str()); // set thread's name for the debugger to display + set_thread_name(name.c_str()); // set thread's name for the debugger to display this->my = new thread_d(*this); current_thread() = this; p->set_value(); @@ -91,7 +91,7 @@ namespace fc { } ); p->wait(); my->boost_thread = t; - set_name(name); + my->name = name; } thread::thread( thread_d* ) { my = new thread_d(*this); @@ -109,20 +109,34 @@ namespace fc { thread::~thread() { //wlog( "my ${n}", ("n",name()) ); - if( is_current() ) + if( my ) { - wlog( "delete my" ); - delete my; + wlog( "calling quit()" ); + quit(); // deletes `my` } - my = 0; } thread& thread::current() { - if( !current_thread() ) current_thread() = new thread((thread_d*)0); + if( !current_thread() ) + current_thread() = new thread((thread_d*)0); return *current_thread(); } - const string& thread::name()const { return my->name; } - void thread::set_name( const fc::string& n ) { my->name = n; } + + const string& thread::name()const + { + return my->name; + } + + void thread::set_name( const fc::string& n ) + { + if (!is_current()) + { + async([=](){ set_name(n); }, "set_name").wait(); + return; + } + my->name = n; + set_thread_name(my->name.c_str()); // set thread's name for the debugger to display + } const char* thread::current_task_desc() const { @@ -133,74 +147,88 @@ namespace fc { void thread::debug( const fc::string& d ) { /*my->debug(d);*/ } - void thread::quit() { - //if quitting from a different thread, start quit task on thread. - //If we have and know our attached boost thread, wait for it to finish, then return. - if( ¤t() != this ) { - async( [=](){quit();}, "thread::quit" );//.wait(); - if( my->boost_thread ) { - auto n = name(); - my->boost_thread->join(); - delete my; - my = nullptr; - } - return; + void thread::quit() + { + //if quitting from a different thread, start quit task on thread. + //If we have and know our attached boost thread, wait for it to finish, then return. + if( ¤t() != this ) + { + async( [=](){quit();}, "thread::quit" );//.wait(); + if( my->boost_thread ) + { + my->boost_thread->join(); + delete my; + my = nullptr; } + return; + } -// wlog( "${s}", ("s",name()) ); - // We are quiting from our own thread... + my->done = true; + // wlog( "${s}", ("s",name()) ); + // We are quiting from our own thread... - // break all promises, thread quit! - while( my->blocked ) { - fc::context* cur = my->blocked; - while( cur ) { - fc::context* n = cur->next; - // this will move the context into the ready list. - //cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) ); - //cur->set_exception_on_blocking_promises( thread_quit() ); - cur->set_exception_on_blocking_promises( std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) ); - - cur = n; - } - if( my->blocked ) { - //wlog( "still blocking... whats up with that?"); - debug( "on quit" ); - } - } - BOOST_ASSERT( my->blocked == 0 ); - //my->blocked = 0; - - - // move all sleep tasks to ready - for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { - my->add_context_to_ready_list( my->sleep_pqueue[i] ); - } - my->sleep_pqueue.clear(); - - // move all idle tasks to ready - fc::context* cur = my->pt_head; - while( cur ) { + // break all promises, thread quit! + while( my->blocked ) + { + fc::context* cur = my->blocked; + while( cur ) + { fc::context* n = cur->next; - cur->next = 0; - my->add_context_to_ready_list( cur ); + // this will move the context into the ready list. + //cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) ); + //cur->set_exception_on_blocking_promises( thread_quit() ); + cur->set_exception_on_blocking_promises( std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) ); + cur = n; } - - // mark all ready tasks (should be everyone)... as canceled - for (fc::context* ready_context : my->ready_heap) - ready_context->canceled = true; - my->done = true; - - // now that we have poked all fibers... switch to the next one and - // let them all quit. - while (!my->ready_heap.empty()) - { - my->start_next_fiber(true); - my->check_for_timeouts(); + if( my->blocked ) + { + //wlog( "still blocking... whats up with that?"); + debug( "on quit" ); } - my->clear_free_list(); - my->cleanup_thread_specific_data(); - } + } + BOOST_ASSERT( my->blocked == 0 ); + //my->blocked = 0; + + for (task_base* unstarted_task : my->task_pqueue) + unstarted_task->set_exception(std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting"))); + my->task_pqueue.clear(); + + for (task_base* scheduled_task : my->task_sch_queue) + scheduled_task->set_exception(std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting"))); + my->task_sch_queue.clear(); + + + + // move all sleep tasks to ready + for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) + my->add_context_to_ready_list( my->sleep_pqueue[i] ); + my->sleep_pqueue.clear(); + + // move all idle tasks to ready + fc::context* cur = my->pt_head; + while( cur ) + { + fc::context* n = cur->next; + cur->next = 0; + my->add_context_to_ready_list( cur ); + cur = n; + } + + // mark all ready tasks (should be everyone)... as canceled + for (fc::context* ready_context : my->ready_heap) + ready_context->canceled = true; + + // now that we have poked all fibers... switch to the next one and + // let them all quit. + while (!my->ready_heap.empty()) + { + my->start_next_fiber(true); + my->check_for_timeouts(); + } + my->clear_free_list(); + my->cleanup_thread_specific_data(); + } void thread::exec() { @@ -450,6 +478,12 @@ namespace fc { async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); } + void thread::unblock(fc::context* c) + { + my->unblock(c); + } + + #ifdef _MSC_VER /* support for providing a structured exception handler for async tasks */ namespace detail diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 5b357d2..f2855d8 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -332,13 +332,16 @@ namespace fc { */ void check_fiber_exceptions() { - if( current && current->canceled ) { + if( current && current->canceled ) + { #ifdef NDEBUG FC_THROW_EXCEPTION( canceled_exception, "" ); #else FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: ${reason}", ("reason", current->cancellation_reason ? current->cancellation_reason : "[none given]")); #endif - } else if( done ) { + } + else if( done ) + { ilog( "throwing canceled exception" ); FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: thread quitting" ); // BOOST_THROW_EXCEPTION( thread_quit() );