diff --git a/include/fc/thread/priority.hpp b/include/fc/thread/priority.hpp index 90749ab..01fd61e 100644 --- a/include/fc/thread/priority.hpp +++ b/include/fc/thread/priority.hpp @@ -15,6 +15,7 @@ namespace fc { } static priority max() { return priority(10000); } static priority min() { return priority(-10000); } + static priority _internal__priority_for_short_sleeps() { return priority(-100000); } int value; }; } diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 60286a0..132833e 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -54,10 +54,6 @@ static void set_thread_name(const char* threadName) #endif namespace fc { -#ifdef ENABLE_FC_THREAD_DEBUG_LOG - FILE* thread_debug_log = fopen("C:/thread_debug.log", "w"); -#endif - const char* thread_name() { return thread::current().name().c_str(); } @@ -191,101 +187,123 @@ namespace fc { } // mark all ready tasks (should be everyone)... as canceled +#ifdef READY_LIST_IS_HEAP + for (fc::context* ready_context : my->ready_heap) + ready_context->canceled = true; +#else cur = my->ready_head; while( cur ) { cur->canceled = true; cur = cur->next; } +#endif my->done = true; // now that we have poked all fibers... switch to the next one and // let them all quit. - while( my->ready_head ) { +#ifdef READY_LIST_IS_HEAP + while (!my->ready_heap.empty()) + { my->start_next_fiber(true); my->check_for_timeouts(); } +#else + while (my->ready_head) + { + my->start_next_fiber(true); + my->check_for_timeouts(); + } +#endif my->clear_free_list(); my->cleanup_thread_specific_data(); } - void thread::exec() { - if( !my->current ) my->current = new fc::context(&fc::thread::current()); - try { + void thread::exec() + { + if( !my->current ) + my->current = new fc::context(&fc::thread::current()); + + try + { my->process_tasks(); } catch( canceled_exception& e ) { - wlog( "thread canceled: ${e}", ("e", e.to_detail_string()) ); + wlog( "thread canceled: ${e}", ("e", e.to_detail_string()) ); } delete my->current; my->current = 0; } - bool thread::is_running()const { + bool thread::is_running()const + { return !my->done; } - priority thread::current_priority()const { + priority thread::current_priority()const + { BOOST_ASSERT(my); - if( my->current ) return my->current->prio; + if( my->current ) + return my->current->prio; return priority(); } - void thread::yield(bool reschedule ) { + void thread::yield(bool reschedule) + { my->check_fiber_exceptions(); my->start_next_fiber(reschedule); my->check_fiber_exceptions(); } - void thread::sleep_until( const time_point& tp ) { - if( tp <= (time_point::now()+fc::microseconds(10000)) ) - { - this->yield(true); - } - my->yield_until( tp, false ); + void thread::sleep_until( const time_point& tp ) + { + if( tp <= (time_point::now()+fc::microseconds(10000)) ) + yield(true); + my->yield_until( tp, false ); } int thread::wait_any_until( std::vector&& p, const time_point& timeout) { - for( size_t i = 0; i < p.size(); ++i ) { - if( p[i]->ready() ) return i; - } + for( size_t i = 0; i < p.size(); ++i ) + if( p[i]->ready() ) + return i; - if( timeout < time_point::now() ) { - fc::stringstream ss; - for( auto i = p.begin(); i != p.end(); ++i ) { - ss << (*i)->get_desc() <<", "; - } - FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) ); + if( timeout < time_point::now() ) + { + fc::stringstream ss; + for( auto i = p.begin(); i != p.end(); ++i ) + ss << (*i)->get_desc() << ", "; + + FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) ); } - if( !my->current ) { + if( !my->current ) my->current = new fc::context(&fc::thread::current()); - } - for( uint32_t i = 0; i < p.size(); ++i ) { - my->current->add_blocking_promise(p[i].get(),false); - }; + for( uint32_t i = 0; i < p.size(); ++i ) + my->current->add_blocking_promise(p[i].get(),false); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { + if( timeout != time_point::maximum() ) + { my->current->resume_time = timeout; my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), sleep_priority_less() ); } + my->add_to_blocked( my->current ); my->start_next_fiber(); - for( auto i = p.begin(); i != p.end(); ++i ) { - my->current->remove_blocking_promise(i->get()); - } + for( auto i = p.begin(); i != p.end(); ++i ) + my->current->remove_blocking_promise(i->get()); my->check_fiber_exceptions(); - for( uint32_t i = 0; i < p.size(); ++i ) { - if( p[i]->ready() ) return i; - } + for( uint32_t i = 0; i < p.size(); ++i ) + if( p[i]->ready() ) + return i; + //BOOST_THROW_EXCEPTION( wait_any_error() ); return -1; } @@ -327,35 +345,43 @@ namespace fc { thread::current().sleep_until(tp); } - void exec() { + void exec() + { return thread::current().exec(); } - int wait_any( std::vector&& v, const microseconds& timeout_us ) { + int wait_any( std::vector&& v, const microseconds& timeout_us ) + { return thread::current().wait_any_until( fc::move(v), time_point::now() + timeout_us ); } - int wait_any_until( std::vector&& v, const time_point& tp ) { + + int wait_any_until( std::vector&& v, const time_point& tp ) + { return thread::current().wait_any_until( fc::move(v), tp ); } - void thread::wait_until( promise_base::ptr&& p, const time_point& timeout ) { - if( p->ready() ) return; + + void thread::wait_until( promise_base::ptr&& p, const time_point& timeout ) + { + if( p->ready() ) + return; + if( timeout < time_point::now() ) - FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) ); + FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) ); - if( !my->current ) { + if( !my->current ) my->current = new fc::context(&fc::thread::current()); - } //slog( " %1% blocking on %2%", my->current, p.get() ); - my->current->add_blocking_promise(p.get(),true); + my->current->add_blocking_promise(p.get(), true); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { + if( timeout != time_point::maximum() ) + { my->current->resume_time = timeout; my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), - sleep_priority_less() ); + sleep_priority_less() ); } // elog( "blocking %1%", my->current ); @@ -372,10 +398,12 @@ namespace fc { my->check_fiber_exceptions(); } - void thread::notify( const promise_base::ptr& p ) { + void thread::notify( const promise_base::ptr& p ) + { //slog( "this %p my %p", this, my ); BOOST_ASSERT(p->ready()); - if( !is_current() ) { + if( !is_current() ) + { this->async( [=](){ notify(p); }, "notify", priority::max() ); return; } @@ -387,14 +415,18 @@ namespace fc { fc::context* cur_blocked = my->blocked; fc::context* prev_blocked = 0; - while( cur_blocked ) { + while( cur_blocked ) + { // if the blocked context is waiting on this promise - if( cur_blocked->try_unblock( p.get() ) ) { + if( cur_blocked->try_unblock( p.get() ) ) + { // remove it from the blocked list. // remove this context from the sleep queue... - for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { - if( my->sleep_pqueue[i] == cur_blocked ) { + for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) + { + if( my->sleep_pqueue[i] == cur_blocked ) + { my->sleep_pqueue[i]->blocking_prom.clear(); my->sleep_pqueue[i] = my->sleep_pqueue.back(); my->sleep_pqueue.pop_back(); @@ -403,22 +435,29 @@ namespace fc { } } auto cur = cur_blocked; - if( prev_blocked ) { + if( prev_blocked ) + { prev_blocked->next_blocked = cur_blocked->next_blocked; cur_blocked = prev_blocked->next_blocked; - } else { + } + else + { my->blocked = cur_blocked->next_blocked; cur_blocked = my->blocked; } cur->next_blocked = 0; my->add_context_to_ready_list( cur ); - } else { // goto the next blocked task + } + else + { // goto the next blocked task prev_blocked = cur_blocked; cur_blocked = cur_blocked->next_blocked; } } } - bool thread::is_current()const { + + bool thread::is_current()const + { return this == ¤t(); } diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 0fd8eb8..8b84ee7 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -8,17 +8,9 @@ #include #include //#include - -//#define ENABLE_FC_THREAD_DEBUG_LOG -#ifdef ENABLE_FC_THREAD_DEBUG_LOG -# define thread_debug_msg(x) do { fprintf x; } while (0) -#else -# define thread_debug_msg(x) do {} while (0) -#endif +#define READY_LIST_IS_HEAP namespace fc { - extern FILE* thread_debug_log; - struct sleep_priority_less { bool operator()( const context::ptr& a, const context::ptr& b ) { return a->resume_time > b->resume_time; @@ -34,8 +26,10 @@ namespace fc { done(false), current(0), pt_head(0), +#ifndef READY_LIST_IS_HEAP ready_head(0), ready_tail(0), +#endif blocked(0), next_unused_task_storage_slot(0) #ifndef NDEBUG @@ -51,12 +45,18 @@ namespace fc { { delete current; fc::context* temp; +#ifdef READY_LIST_IS_HEAP + for (fc::context* ready_context : ready_heap) + delete ready_context; + ready_heap.clear(); +#else while (ready_head) { temp = ready_head->next; delete ready_head; ready_head = temp; } +#endif while (blocked) { temp = blocked->next; @@ -98,8 +98,12 @@ namespace fc { fc::context* pt_head; // list of contexts that can be reused for new tasks +#ifdef READY_LIST_IS_HEAP + std::vector ready_heap; +#else fc::context* ready_head; // linked list (using 'next') of contexts that are ready to run fc::context* ready_tail; +#endif fc::context* blocked; // linked list of contexts (using 'next_blocked') blocked on promises via wait() @@ -182,68 +186,96 @@ namespace fc { */ } - fc::context::ptr ready_pop_front() - { - fc::context::ptr tmp = nullptr; - if( ready_head ) - { - tmp = ready_head; - ready_head = tmp->next; - if( !ready_head ) - ready_tail = nullptr; - tmp->next = nullptr; - } - return tmp; - } - - void add_context_to_ready_list(context* context_to_add) - { - if (!ready_tail) - ready_head = context_to_add; - else - { - context_to_add->context_posted_num = ++next_posted_num; - ready_tail->next = context_to_add; - } - ready_tail = context_to_add; - } - -#if 0 - void ready_push_front(const fc::context::ptr& context_to_push) + fc::context::ptr ready_pop_front() { - BOOST_ASSERT(context_to_push->next == nullptr); - BOOST_ASSERT(context_to_push != current); - - context** iter = &ready_head; - while (*iter && (*iter)->resume_time > context_to_push->resume_time) - iter = &((*iter)->next); - context_to_push->next = *iter; - *iter = context_to_push; - if (!context_to_push->next) - ready_tail = context_to_push; - } - - void ready_push_back(const fc::context::ptr& context_to_push) - { - BOOST_ASSERT(context_to_push->next == nullptr); - BOOST_ASSERT(context_to_push != current); - - if (!ready_tail) - ready_head = context_to_push; - else +#ifdef READY_LIST_IS_HEAP + fc::context* highest_priority_context = ready_heap.front(); + std::pop_heap(ready_heap.begin(), ready_heap.end(), task_priority_less()); + ready_heap.pop_back(); + return highest_priority_context; +#else + fc::context::ptr tmp = nullptr; + if( ready_head ) { - if (context_to_push->resume_time <= ready_tail->resume_time) - context_to_push->resume_time = ready_tail->resume_time + fc::microseconds(1); - ready_tail->next = context_to_push; + tmp = ready_head; + ready_head = tmp->next; + if( !ready_head ) + ready_tail = nullptr; + tmp->next = nullptr; } - ready_tail = context_to_push; - } + return tmp; #endif + } + + void add_context_to_ready_list(context* context_to_add, bool at_end = false) + { + +#ifdef READY_LIST_IS_HEAP + context_to_add->context_posted_num = next_posted_num++; + ready_heap.push_back(context_to_add); + std::push_heap(ready_heap.begin(), ready_heap.end(), task_priority_less()); +#else +# if 1 + if (at_end) + { + if (!ready_tail) + { + ready_head = context_to_add; + context_to_add->context_posted_num = next_posted_num + 100000; + } + else + { + context_to_add->context_posted_num = next_posted_num++; + ready_tail->next = context_to_add; + } + ready_tail = context_to_add; + } + else + { + context_to_add->context_posted_num = next_posted_num++; + context_to_add->next = ready_head; + ready_head = context_to_add; + if (!ready_tail) + ready_tail = context_to_add; + } +# else + if (!ready_tail) + ready_head = context_to_add; + else + { + context_to_add->context_posted_num = next_posted_num++; + ready_tail->next = context_to_add; + } + ready_tail = context_to_add; +# endif +#endif + } + struct task_priority_less { - bool operator()( task_base* a, task_base* b ) + bool operator()(const task_base* a, const task_base* b) const { - return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num); + return a->_prio.value < b->_prio.value ? true : + (a->_prio.value > b->_prio.value ? false : + a->_posted_num > b->_posted_num); + } + bool operator()(const task_base* a, const context* b) const + { + return a->_prio.value < b->prio.value ? true : + (a->_prio.value > b->prio.value ? false : + a->_posted_num > b->context_posted_num); + } + bool operator()(const context* a, const task_base* b) const + { + return a->prio.value < b->_prio.value ? true : + (a->prio.value > b->_prio.value ? false : + a->context_posted_num > b->_posted_num); + } + bool operator()(const context* a, const context* b) const + { + return a->prio.value < b->prio.value ? true : + (a->prio.value > b->prio.value ? false : + a->context_posted_num > b->context_posted_num); } }; @@ -406,29 +438,34 @@ namespace fc { if( !current ) current = new fc::context( &fc::thread::current() ); + priority original_priority = current->prio; + // check to see if any other contexts are ready - if( ready_head ) +#ifdef READY_LIST_IS_HEAP + if (!ready_heap.empty()) +#else + if (ready_head) +#endif { fc::context* next = ready_pop_front(); - if( next == current ) + if (next == current) { // elog( "next == current... something went wrong" ); assert(next != current); return false; } - BOOST_ASSERT( next != current ); + BOOST_ASSERT(next != current); // jump to next context, saving current context fc::context* prev = current; current = next; if (reschedule) - add_context_to_ready_list(prev); + { + current->prio = priority::_internal__priority_for_short_sleeps(); + add_context_to_ready_list(prev, true); + } // slog( "jump to %p from %p", next, prev ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); - thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n", - name.c_str(), - prev->cur_task ? prev->cur_task->get_desc() : "unknown", - next->cur_task ? next->cur_task->get_desc() : "unknown")); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); #elif BOOST_VERSION >= 105300 @@ -464,14 +501,13 @@ namespace fc { current = next; if( reschedule ) - add_context_to_ready_list(prev); + { + current->prio = priority::_internal__priority_for_short_sleeps(); + add_context_to_ready_list(prev, true); + } // slog( "jump to %p from %p", next, prev ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); - thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n", - name.c_str(), - prev->cur_task ? prev->cur_task->get_desc() : "unknown", - next->cur_task ? next->cur_task->get_desc() : "unknown")); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); #elif BOOST_VERSION >= 105300 @@ -484,6 +520,9 @@ namespace fc { //current = prev; } + if (reschedule) + current->prio = original_priority; + if( current->canceled ) { //current->canceled = false; @@ -525,9 +564,6 @@ namespace fc { next->_set_active_context( current ); current->cur_task = next; - - thread_debug_msg((thread_debug_log, "EMF: [%s] starting task \"%s\"\n", name.c_str(), next->get_desc())); - next->run(); current->cur_task = 0; next->_set_active_context(0); @@ -563,10 +599,20 @@ namespace fc { if (!task_pqueue.empty()) { - if (ready_head) +#if 1 + if (task_pqueue.front()->_prio.value != priority::max().value && +#ifdef READY_LIST_IS_HEAP + !ready_heap.empty()) +#else + ready_head) +#endif { // a new task and an existing task are both ready to go +#ifdef READY_LIST_IS_HEAP + if (task_priority_less()(ready_heap.front(), task_pqueue.front())) +#else if (ready_head->context_posted_num < task_pqueue.front()->_posted_num) +#endif { // run the existing task first pt_push_back(current); @@ -574,16 +620,21 @@ namespace fc { continue; } } +#endif // if we made it here, either there's no ready context, or the ready context is - // scheduled after the ready task, so we shoudl run the task first + // scheduled after the ready task, so we should run the task first run_next_task(); continue; } // if I have something else to do other than // process tasks... do it. - if( ready_head ) +#ifdef READY_LIST_IS_HEAP + if (!ready_heap.empty()) +#else + if (ready_head) +#endif { pt_push_back( current ); start_next_fiber(false); @@ -597,14 +648,14 @@ namespace fc { { // lock scope boost::unique_lock lock(task_ready_mutex); - if( has_next_task() ) continue; + if( has_next_task() ) + continue; time_point timeout_time = check_for_timeouts(); - if( done ) return; + if( done ) + return; if( timeout_time == time_point::maximum() ) - { task_ready.wait( lock ); - } else if( timeout_time != time_point::min() ) { // there may be tasks that have been canceled we should filter them out now @@ -647,9 +698,9 @@ namespace fc { } time_point next = time_point::maximum(); - if( sleep_pqueue.size() && next > sleep_pqueue.front()->resume_time ) + if( !sleep_pqueue.empty() && next > sleep_pqueue.front()->resume_time ) next = sleep_pqueue.front()->resume_time; - if( task_sch_queue.size() && next > task_sch_queue.front()->_when ) + if( !task_sch_queue.empty() && next > task_sch_queue.front()->_when ) next = task_sch_queue.front()->_when; time_point now = time_point::now(); @@ -795,6 +846,10 @@ namespace fc { { if ((*sleep_iter)->canceled) { +#ifdef READY_LIST_IS_HEAP + bool already_on_ready_list = std::find(ready_heap.begin(), ready_heap.end(), + *sleep_iter) != ready_heap.end(); +#else bool already_on_ready_list = false; for (fc::context* ready_iter = ready_head; ready_iter; ready_iter = ready_iter->next) if (ready_iter == *sleep_iter) @@ -802,6 +857,7 @@ namespace fc { already_on_ready_list = true; break; } +#endif if (!already_on_ready_list) add_context_to_ready_list(*sleep_iter); sleep_iter = sleep_pqueue.erase(sleep_iter);