Added a mechanism to get notifications when a thread is idle

This commit is contained in:
Peter Conrad 2018-10-03 10:47:02 +02:00
parent fa7f6af01f
commit 9d54742741
4 changed files with 72 additions and 40 deletions

View file

@ -25,6 +25,7 @@ namespace fc {
}; };
void* get_task_specific_data(unsigned slot); void* get_task_specific_data(unsigned slot);
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
class idle_guard;
} }
class task_base : virtual public promise_base { class task_base : virtual public promise_base {
@ -53,6 +54,7 @@ namespace fc {
// thread/thread_private // thread/thread_private
friend class thread; friend class thread;
friend class thread_d; friend class thread_d;
friend class detail::idle_guard;
fwd<spin_lock,8> _spinlock; fwd<spin_lock,8> _spinlock;
// avoid rtti info for every possible functor... // avoid rtti info for every possible functor...

View file

@ -12,6 +12,7 @@ namespace fc {
namespace detail namespace detail
{ {
class pool_impl;
void* get_thread_specific_data(unsigned slot); void* get_thread_specific_data(unsigned slot);
void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
unsigned get_next_unused_task_storage_slot(); unsigned get_next_unused_task_storage_slot();
@ -19,9 +20,25 @@ namespace fc {
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
} }
/** Instances of this class can be used to get notifications when a thread is
* (or is no longer) idle.
*/
class thread_idle_notifier {
public:
/** This method is called when the thread is idle. If it returns a
* task_base it will be queued and executed immediately.
* @return a task to execute, or nullptr
*/
virtual task_base* idle() = 0;
/** This method is called when the thread is no longer idle, e. g. after
* it has woken up due to a timer or signal.
*/
virtual void busy() = 0;
};
class thread { class thread {
public: public:
thread( const std::string& name = "" ); thread( const std::string& name = "", thread_idle_notifier* notifier = 0 );
thread( thread&& m ) = delete; thread( thread&& m ) = delete;
thread& operator=(thread&& t ) = delete; thread& operator=(thread&& t ) = delete;
@ -135,6 +152,7 @@ namespace fc {
friend class task_base; friend class task_base;
friend class thread_d; friend class thread_d;
friend class mutex; friend class mutex;
friend class detail::pool_impl;
friend void* detail::get_thread_specific_data(unsigned slot); friend void* detail::get_thread_specific_data(unsigned slot);
friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
friend unsigned detail::get_next_unused_task_storage_slot(); friend unsigned detail::get_next_unused_task_storage_slot();

View file

@ -70,12 +70,12 @@ namespace fc {
return t; return t;
} }
thread::thread( const std::string& name ) { thread::thread( const std::string& name, thread_idle_notifier* notifier ) {
promise<void>::ptr p(new promise<void>("thread start")); promise<void>::ptr p(new promise<void>("thread start"));
boost::thread* t = new boost::thread( [this,p,name]() { boost::thread* t = new boost::thread( [this,p,name,notifier]() {
try { try {
set_thread_name(name.c_str()); // set thread's name for the debugger to display set_thread_name(name.c_str()); // set thread's name for the debugger to display
this->my = new thread_d(*this); this->my = new thread_d( *this, notifier );
current_thread() = this; current_thread() = this;
p->set_value(); p->set_value();
exec(); exec();
@ -85,26 +85,19 @@ namespace fc {
} catch ( ... ) { } catch ( ... ) {
wlog( "unhandled exception" ); wlog( "unhandled exception" );
p->set_exception( std::make_shared<unhandled_exception>( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); p->set_exception( std::make_shared<unhandled_exception>( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) );
//assert( !"unhandled exception" );
//elog( "Caught unhandled exception %s", boost::current_exception_diagnostic_information().c_str() );
} }
} ); } );
p->wait(); p->wait();
my->boost_thread = t; my->boost_thread = t;
my->name = name; my->name = name;
//wlog("name:${n} tid:${tid}", ("n", name)("tid", (uintptr_t)my->boost_thread->native_handle()) );
} }
thread::thread( thread_d* ) { thread::thread( thread_d* ) {
my = new thread_d(*this); my = new thread_d(*this);
} }
thread::~thread() { thread::~thread() {
//wlog( "my ${n}", ("n",name()) );
if( my ) if( my )
{
// wlog( "calling quit() on ${n}",("n",my->name) );
quit(); quit();
}
delete my; delete my;
} }
@ -129,7 +122,7 @@ namespace fc {
{ {
if (!is_current()) if (!is_current())
{ {
async([=](){ set_name(n); }, "set_name").wait(); async([this,n](){ set_name(n); }, "set_name").wait();
return; return;
} }
my->name = n; my->name = n;
@ -152,17 +145,13 @@ namespace fc {
if( !is_current() ) if( !is_current() )
{ {
auto t = my->boost_thread; auto t = my->boost_thread;
async( [=](){quit();}, "thread::quit" );//.wait(); async( [this](){quit();}, "thread::quit" );
if( t ) if( t )
{
//wlog("destroying boost thread ${tid}",("tid",(uintptr_t)my->boost_thread->native_handle()));
t->join(); t->join();
}
return; return;
} }
my->done = true; my->done = true;
// wlog( "${s}", ("s",name()) );
// We are quiting from our own thread... // We are quiting from our own thread...
// break all promises, thread quit! // break all promises, thread quit!
@ -173,20 +162,14 @@ namespace fc {
{ {
fc::context* n = cur->next; fc::context* n = cur->next;
// this will move the context into the ready list. // this will move the context into the ready list.
//cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) );
//cur->set_exception_on_blocking_promises( thread_quit() );
cur->set_exception_on_blocking_promises( std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) ); cur->set_exception_on_blocking_promises( std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) );
cur = n; cur = n;
} }
if( my->blocked ) if( my->blocked )
{
//wlog( "still blocking... whats up with that?");
debug( "on quit" ); debug( "on quit" );
} }
}
BOOST_ASSERT( my->blocked == 0 ); BOOST_ASSERT( my->blocked == 0 );
//my->blocked = 0;
for (task_base* unstarted_task : my->task_pqueue) for (task_base* unstarted_task : my->task_pqueue)
unstarted_task->set_exception(std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting"))); unstarted_task->set_exception(std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")));
@ -314,7 +297,6 @@ namespace fc {
if( p[i]->ready() ) if( p[i]->ready() )
return i; return i;
//BOOST_THROW_EXCEPTION( wait_any_error() );
return -1; return -1;
} }
@ -330,8 +312,6 @@ namespace fc {
void thread::async_task( task_base* t, const priority& p, const time_point& tp ) { void thread::async_task( task_base* t, const priority& p, const time_point& tp ) {
assert(my); assert(my);
t->_when = tp; t->_when = tp;
// slog( "when %lld", t->_when.time_since_epoch().count() );
// slog( "delay %lld", (tp - fc::time_point::now()).count() );
task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed); task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
do { t->_next = stale_head; do { t->_next = stale_head;
}while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) ); }while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );
@ -381,7 +361,6 @@ namespace fc {
if( !my->current ) if( !my->current )
my->current = new fc::context(&fc::thread::current()); my->current = new fc::context(&fc::thread::current());
//slog( " %1% blocking on %2%", my->current, p.get() );
my->current->add_blocking_promise(p.get(), true); my->current->add_blocking_promise(p.get(), true);
// if not max timeout, added to sleep pqueue // if not max timeout, added to sleep pqueue
@ -394,15 +373,10 @@ namespace fc {
sleep_priority_less() ); sleep_priority_less() );
} }
// elog( "blocking %1%", my->current );
my->add_to_blocked( my->current ); my->add_to_blocked( my->current );
// my->debug("swtiching fibers..." );
my->start_next_fiber(); my->start_next_fiber();
// slog( "resuming %1%", my->current );
//slog( " %1% unblocking blocking on %2%", my->current, p.get() );
my->current->remove_blocking_promise(p.get()); my->current->remove_blocking_promise(p.get());
my->check_fiber_exceptions(); my->check_fiber_exceptions();
@ -410,7 +384,6 @@ namespace fc {
void thread::notify( const promise_base::ptr& p ) void thread::notify( const promise_base::ptr& p )
{ {
//slog( "this %p my %p", this, my );
BOOST_ASSERT(p->ready()); BOOST_ASSERT(p->ready());
if( !is_current() ) if( !is_current() )
{ {
@ -473,7 +446,7 @@ namespace fc {
void thread::notify_task_has_been_canceled() void thread::notify_task_has_been_canceled()
{ {
async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); async( [this](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() );
} }
void thread::unblock(fc::context* c) void thread::unblock(fc::context* c)
@ -481,6 +454,26 @@ namespace fc {
my->unblock(c); my->unblock(c);
} }
namespace detail {
idle_guard::idle_guard( thread_d* t ) : notifier(t->notifier)
{
if( notifier )
{
task_base* work = notifier->idle();
if( work )
{
task_base* stale_head = t->task_in_queue.load(boost::memory_order_relaxed);
do {
work->_next = stale_head;
} while( !t->task_in_queue.compare_exchange_weak( stale_head, work, boost::memory_order_release ) );
}
}
}
idle_guard::~idle_guard()
{
if( notifier ) notifier->busy();
}
}
#ifdef _MSC_VER #ifdef _MSC_VER
/* support for providing a structured exception handler for async tasks */ /* support for providing a structured exception handler for async tasks */

View file

@ -15,12 +15,23 @@ namespace fc {
return a->resume_time > b->resume_time; return a->resume_time > b->resume_time;
} }
}; };
namespace detail {
class idle_guard {
public:
idle_guard( thread_d* t );
~idle_guard();
private:
thread_idle_notifier* notifier;
};
}
class thread_d { class thread_d {
public: public:
using context_pair = std::pair<thread_d*, fc::context*>; using context_pair = std::pair<thread_d*, fc::context*>;
thread_d(fc::thread& s) thread_d( fc::thread& s, thread_idle_notifier* n = 0 )
:self(s), boost_thread(0), :self(s), boost_thread(0),
task_in_queue(0), task_in_queue(0),
next_posted_num(1), next_posted_num(1),
@ -28,7 +39,8 @@ namespace fc {
current(0), current(0),
pt_head(0), pt_head(0),
blocked(0), blocked(0),
next_unused_task_storage_slot(0) next_unused_task_storage_slot(0),
notifier(n)
#ifndef NDEBUG #ifndef NDEBUG
,non_preemptable_scope_count(0) ,non_preemptable_scope_count(0)
#endif #endif
@ -98,6 +110,8 @@ namespace fc {
std::vector<detail::specific_data_info> non_task_specific_data; std::vector<detail::specific_data_info> non_task_specific_data;
unsigned next_unused_task_storage_slot; unsigned next_unused_task_storage_slot;
thread_idle_notifier *notifier;
#ifndef NDEBUG #ifndef NDEBUG
unsigned non_preemptable_scope_count; unsigned non_preemptable_scope_count;
#endif #endif
@ -585,6 +599,11 @@ namespace fc {
if( done ) if( done )
return; return;
detail::idle_guard guard( this );
if( task_in_queue.load(boost::memory_order_relaxed) )
continue;
if( timeout_time == time_point::maximum() ) if( timeout_time == time_point::maximum() )
task_ready.wait( lock ); task_ready.wait( lock );
else if( timeout_time != time_point::min() ) else if( timeout_time != time_point::min() )
@ -666,7 +685,7 @@ namespace fc {
{ {
if( fc::thread::current().my != this ) if( fc::thread::current().my != this )
{ {
self.async( [=](){ unblock(c); }, "thread_d::unblock" ); self.async( [this,c](){ unblock(c); }, "thread_d::unblock" );
return; return;
} }