From 9d547427417b8ad2cbe7917600a043c89dbc6d0e Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Wed, 3 Oct 2018 10:47:02 +0200 Subject: [PATCH] Added a mechanism to get notifications when a thread is idle --- include/fc/thread/task.hpp | 2 ++ include/fc/thread/thread.hpp | 20 ++++++++++- src/thread/thread.cpp | 65 ++++++++++++++++-------------------- src/thread/thread_d.hpp | 25 ++++++++++++-- 4 files changed, 72 insertions(+), 40 deletions(-) diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index dd52b77..6c79d71 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -25,6 +25,7 @@ namespace fc { }; void* get_task_specific_data(unsigned slot); void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); + class idle_guard; } class task_base : virtual public promise_base { @@ -53,6 +54,7 @@ namespace fc { // thread/thread_private friend class thread; friend class thread_d; + friend class detail::idle_guard; fwd _spinlock; // avoid rtti info for every possible functor... diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index 933832f..bbd8317 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -12,6 +12,7 @@ namespace fc { namespace detail { + class pool_impl; void* get_thread_specific_data(unsigned slot); void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); unsigned get_next_unused_task_storage_slot(); @@ -19,9 +20,25 @@ namespace fc { void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); } + /** Instances of this class can be used to get notifications when a thread is + * (or is no longer) idle. + */ + class thread_idle_notifier { + public: + /** This method is called when the thread is idle. If it returns a + * task_base it will be queued and executed immediately. + * @return a task to execute, or nullptr + */ + virtual task_base* idle() = 0; + /** This method is called when the thread is no longer idle, e. g. after + * it has woken up due to a timer or signal. + */ + virtual void busy() = 0; + }; + class thread { public: - thread( const std::string& name = "" ); + thread( const std::string& name = "", thread_idle_notifier* notifier = 0 ); thread( thread&& m ) = delete; thread& operator=(thread&& t ) = delete; @@ -135,6 +152,7 @@ namespace fc { friend class task_base; friend class thread_d; friend class mutex; + friend class detail::pool_impl; friend void* detail::get_thread_specific_data(unsigned slot); friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); friend unsigned detail::get_next_unused_task_storage_slot(); diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index dd1dbdc..73fa303 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -62,20 +62,20 @@ namespace fc { } thread*& current_thread() { - #ifdef _MSC_VER +#ifdef _MSC_VER static __declspec(thread) thread* t = NULL; - #else +#else static __thread thread* t = NULL; - #endif +#endif return t; } - thread::thread( const std::string& name ) { + thread::thread( const std::string& name, thread_idle_notifier* notifier ) { promise::ptr p(new promise("thread start")); - boost::thread* t = new boost::thread( [this,p,name]() { + boost::thread* t = new boost::thread( [this,p,name,notifier]() { try { set_thread_name(name.c_str()); // set thread's name for the debugger to display - this->my = new thread_d(*this); + this->my = new thread_d( *this, notifier ); current_thread() = this; p->set_value(); exec(); @@ -85,26 +85,19 @@ namespace fc { } catch ( ... ) { wlog( "unhandled exception" ); p->set_exception( std::make_shared( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); - //assert( !"unhandled exception" ); - //elog( "Caught unhandled exception %s", boost::current_exception_diagnostic_information().c_str() ); } } ); p->wait(); my->boost_thread = t; my->name = name; - //wlog("name:${n} tid:${tid}", ("n", name)("tid", (uintptr_t)my->boost_thread->native_handle()) ); } thread::thread( thread_d* ) { my = new thread_d(*this); } thread::~thread() { - //wlog( "my ${n}", ("n",name()) ); if( my ) - { - // wlog( "calling quit() on ${n}",("n",my->name) ); quit(); - } delete my; } @@ -129,7 +122,7 @@ namespace fc { { if (!is_current()) { - async([=](){ set_name(n); }, "set_name").wait(); + async([this,n](){ set_name(n); }, "set_name").wait(); return; } my->name = n; @@ -152,17 +145,13 @@ namespace fc { if( !is_current() ) { auto t = my->boost_thread; - async( [=](){quit();}, "thread::quit" );//.wait(); + async( [this](){quit();}, "thread::quit" ); if( t ) - { - //wlog("destroying boost thread ${tid}",("tid",(uintptr_t)my->boost_thread->native_handle())); t->join(); - } return; } my->done = true; - // wlog( "${s}", ("s",name()) ); // We are quiting from our own thread... // break all promises, thread quit! @@ -173,20 +162,14 @@ namespace fc { { fc::context* n = cur->next; // this will move the context into the ready list. - //cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) ); - //cur->set_exception_on_blocking_promises( thread_quit() ); cur->set_exception_on_blocking_promises( std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) ); cur = n; } if( my->blocked ) - { - //wlog( "still blocking... whats up with that?"); debug( "on quit" ); - } } BOOST_ASSERT( my->blocked == 0 ); - //my->blocked = 0; for (task_base* unstarted_task : my->task_pqueue) unstarted_task->set_exception(std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting"))); @@ -314,7 +297,6 @@ namespace fc { if( p[i]->ready() ) return i; - //BOOST_THROW_EXCEPTION( wait_any_error() ); return -1; } @@ -330,8 +312,6 @@ namespace fc { void thread::async_task( task_base* t, const priority& p, const time_point& tp ) { assert(my); t->_when = tp; - // slog( "when %lld", t->_when.time_since_epoch().count() ); - // slog( "delay %lld", (tp - fc::time_point::now()).count() ); task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed); do { t->_next = stale_head; }while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) ); @@ -381,7 +361,6 @@ namespace fc { if( !my->current ) my->current = new fc::context(&fc::thread::current()); - //slog( " %1% blocking on %2%", my->current, p.get() ); my->current->add_blocking_promise(p.get(), true); // if not max timeout, added to sleep pqueue @@ -394,15 +373,10 @@ namespace fc { sleep_priority_less() ); } - // elog( "blocking %1%", my->current ); my->add_to_blocked( my->current ); - // my->debug("swtiching fibers..." ); - my->start_next_fiber(); - // slog( "resuming %1%", my->current ); - //slog( " %1% unblocking blocking on %2%", my->current, p.get() ); my->current->remove_blocking_promise(p.get()); my->check_fiber_exceptions(); @@ -410,7 +384,6 @@ namespace fc { void thread::notify( const promise_base::ptr& p ) { - //slog( "this %p my %p", this, my ); BOOST_ASSERT(p->ready()); if( !is_current() ) { @@ -473,7 +446,7 @@ namespace fc { void thread::notify_task_has_been_canceled() { - async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); + async( [this](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); } void thread::unblock(fc::context* c) @@ -481,6 +454,26 @@ namespace fc { my->unblock(c); } + namespace detail { + idle_guard::idle_guard( thread_d* t ) : notifier(t->notifier) + { + if( notifier ) + { + task_base* work = notifier->idle(); + if( work ) + { + task_base* stale_head = t->task_in_queue.load(boost::memory_order_relaxed); + do { + work->_next = stale_head; + } while( !t->task_in_queue.compare_exchange_weak( stale_head, work, boost::memory_order_release ) ); + } + } + } + idle_guard::~idle_guard() + { + if( notifier ) notifier->busy(); + } + } #ifdef _MSC_VER /* support for providing a structured exception handler for async tasks */ diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 6ad9421..d07e3f9 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -15,12 +15,23 @@ namespace fc { return a->resume_time > b->resume_time; } }; + + namespace detail { + class idle_guard { + public: + idle_guard( thread_d* t ); + ~idle_guard(); + private: + thread_idle_notifier* notifier; + }; + } + class thread_d { public: using context_pair = std::pair; - thread_d(fc::thread& s) + thread_d( fc::thread& s, thread_idle_notifier* n = 0 ) :self(s), boost_thread(0), task_in_queue(0), next_posted_num(1), @@ -28,7 +39,8 @@ namespace fc { current(0), pt_head(0), blocked(0), - next_unused_task_storage_slot(0) + next_unused_task_storage_slot(0), + notifier(n) #ifndef NDEBUG ,non_preemptable_scope_count(0) #endif @@ -98,6 +110,8 @@ namespace fc { std::vector non_task_specific_data; unsigned next_unused_task_storage_slot; + thread_idle_notifier *notifier; + #ifndef NDEBUG unsigned non_preemptable_scope_count; #endif @@ -585,6 +599,11 @@ namespace fc { if( done ) return; + + detail::idle_guard guard( this ); + if( task_in_queue.load(boost::memory_order_relaxed) ) + continue; + if( timeout_time == time_point::maximum() ) task_ready.wait( lock ); else if( timeout_time != time_point::min() ) @@ -666,7 +685,7 @@ namespace fc { { if( fc::thread::current().my != this ) { - self.async( [=](){ unblock(c); }, "thread_d::unblock" ); + self.async( [this,c](){ unblock(c); }, "thread_d::unblock" ); return; }