#include #include #include "thread_d.hpp" namespace fc { const char* thread_name() { return thread::current().name().c_str(); } void* thread_ptr() { return &thread::current(); } boost::mutex& log_mutex() { static boost::mutex m; return m; } thread*& current_thread() { #ifdef _MSC_VER static __declspec(thread) thread* t = NULL; #else static __thread thread* t = NULL; #endif return t; } thread::thread( const char* name ) { promise::ptr p(new promise()); boost::thread* t = new boost::thread( [this,p,name]() { try { this->my = new thread_d(*this); current_thread() = this; p->set_value(); exec(); } catch ( ... ) { elog( "Caught unhandled exception %1%", boost::current_exception_diagnostic_information() ); } delete this->my; this->my = 0; } ); p->wait(); my->boost_thread = t; set_name(name); } thread::thread( thread_d* ) { my = new thread_d(*this); } thread::thread( thread&& m ) { my = m.my; m.my = 0; } thread& thread::operator=(thread&& t ) { fc::swap(t.my,my); return *this; } thread::~thread() { if( is_current() ) { delete my; my = 0; } } thread& thread::current() { if( !current_thread() ) current_thread() = new thread((thread_d*)0); return *current_thread(); } const string& thread::name()const { return my->name; } void thread::set_name( const fc::string& n ) { my->name = n; } void thread::debug( const fc::string& d ) { my->debug(d); } void thread::quit() { if( ¤t() != this ) { async( [=](){quit();} );//.wait(); if( my->boost_thread ) { auto n = name(); slog( "joining... %s", n.c_str() ); my->boost_thread->join(); slog( "done joining... %s", n.c_str() ); } return; } wlog( "%s", my->name.c_str() ); // break all promises, thread quit! fc::context* cur = my->blocked; while( cur ) { fc::context* n = cur->next; // this will move the context into the ready list. //cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) ); cur->except_blocking_promises( thread_quit() ); cur = n; } BOOST_ASSERT( my->blocked == 0 ); //my->blocked = 0; // move all sleep tasks to ready for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { my->ready_push_front( my->sleep_pqueue[i] ); } my->sleep_pqueue.clear(); // move all idle tasks to ready cur = my->pt_head; while( cur ) { fc::context* n = cur->next; cur->next = 0; my->ready_push_front( cur ); cur = n; } // mark all ready tasks (should be everyone)... as canceled cur = my->ready_head; while( cur ) { cur->canceled = true; cur = cur->next; } my->done = true; // now that we have poked all fibers... switch to the next one and // let them all quit. while( my->ready_head ) { my->start_next_fiber(true); my->check_for_timeouts(); } my->clear_free_list(); } void thread::exec() { if( !my->current ) my->current = new fc::context(&fc::thread::current()); my->process_tasks(); delete my->current; my->current = 0; } bool thread::is_running()const { return !my->done; } priority thread::current_priority()const { BOOST_ASSERT(my); if( my->current ) return my->current->prio; return priority(); } void thread::yield(bool reschedule ) { my->check_fiber_exceptions(); my->start_next_fiber(reschedule); my->check_fiber_exceptions(); } void thread::sleep_until( const time_point& tp ) { my->check_fiber_exceptions(); BOOST_ASSERT( ¤t() == this ); if( !my->current ) { my->current = new fc::context(&fc::thread::current()); } my->current->resume_time = tp; my->current->clear_blocking_promises(); my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), sleep_priority_less() ); my->start_next_fiber(); my->current->resume_time = time_point::max(); my->check_fiber_exceptions(); } int thread::wait_any_until( fc::vector&& p, const time_point& timeout) { for( size_t i = 0; i < p.size(); ++i ) { if( p[i]->ready() ) return i; } if( timeout < time_point::now() ) BOOST_THROW_EXCEPTION( future_wait_timeout() ); if( !my->current ) { my->current = new fc::context(&fc::thread::current()); } for( uint32_t i = 0; i < p.size(); ++i ) { my->current->add_blocking_promise(p[i].get(),false); }; // if not max timeout, added to sleep pqueue if( timeout != time_point::max() ) { my->current->resume_time = timeout; my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), sleep_priority_less() ); } my->add_to_blocked( my->current ); my->start_next_fiber(); for( auto i = p.begin(); i != p.end(); ++i ) { my->current->remove_blocking_promise(i->get()); } my->check_fiber_exceptions(); for( uint32_t i = 0; i < p.size(); ++i ) { if( p[i]->ready() ) return i; } BOOST_THROW_EXCEPTION( wait_any_error() ); return -1; } void thread::async_task( task_base* t, const priority& p, const char* desc ) { async_task( t, p, time_point::max(), desc ); } void thread::poke() { boost::unique_lock lock(my->task_ready_mutex); my->task_ready.notify_one(); } void thread::async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ) { task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed); do { t->_next = stale_head; }while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) ); // Because only one thread can post the 'first task', only that thread will attempt // to aquire the lock and therefore there should be no contention on this lock except // when *this thread is about to block on a wait condition. if( this != ¤t() && !stale_head ) { boost::unique_lock lock(my->task_ready_mutex); my->task_ready.notify_one(); } } void yield() { thread::current().yield(); } void usleep( const microseconds& u ) { thread::current().sleep_until( time_point::now() + u); } void sleep_until( const time_point& tp ) { thread::current().sleep_until(tp); } void exec() { return thread::current().exec(); } int wait_any( fc::vector&& v, const microseconds& timeout_us ) { return thread::current().wait_any_until( fc::move(v), time_point::now() + timeout_us ); } int wait_any_until( fc::vector&& v, const time_point& tp ) { return thread::current().wait_any_until( fc::move(v), tp ); } void thread::wait_until( promise_base::ptr&& p, const time_point& timeout ) { if( p->ready() ) return; if( timeout < time_point::now() ) BOOST_THROW_EXCEPTION( future_wait_timeout() ); if( !my->current ) { my->current = new fc::context(&fc::thread::current()); } //slog( " %1% blocking on %2%", my->current, p.get() ); my->current->add_blocking_promise(p.get(),true); // if not max timeout, added to sleep pqueue if( timeout != time_point::max() ) { my->current->resume_time = timeout; my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), sleep_priority_less() ); } // elog( "blocking %1%", my->current ); my->add_to_blocked( my->current ); // my->debug("swtiching fibers..." ); my->start_next_fiber(); // slog( "resuming %1%", my->current ); //slog( " %1% unblocking blocking on %2%", my->current, p.get() ); my->current->remove_blocking_promise(p.get()); my->check_fiber_exceptions(); } void thread::notify( const promise_base::ptr& p ) { BOOST_ASSERT(p->ready()); if( !is_current() ) { this->async( [=](){ notify(p); } ); return; } // TODO: store a list of blocked contexts with the promise // to accelerate the lookup.... unless it introduces contention... // iterate over all blocked contexts fc::context* cur_blocked = my->blocked; fc::context* prev_blocked = 0; while( cur_blocked ) { // if the blocked context is waiting on this promise if( cur_blocked->try_unblock( p.get() ) ) { // remove it from the blocked list. // remove this context from the sleep queue... for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { if( my->sleep_pqueue[i] == cur_blocked ) { my->sleep_pqueue[i]->blocking_prom.clear(); my->sleep_pqueue[i] = my->sleep_pqueue.back(); my->sleep_pqueue.pop_back(); std::make_heap( my->sleep_pqueue.begin(),my->sleep_pqueue.end(), sleep_priority_less() ); break; } } auto cur = cur_blocked; if( prev_blocked ) { prev_blocked->next_blocked = cur_blocked->next_blocked; cur_blocked = prev_blocked->next_blocked; } else { my->blocked = cur_blocked->next_blocked; cur_blocked = my->blocked; } cur->next_blocked = 0; my->ready_push_front( cur ); } else { // goto the next blocked task prev_blocked = cur_blocked; cur_blocked = cur_blocked->next_blocked; } } } bool thread::is_current()const { return this == ¤t(); } }