From 1af4ac6a5cd4e96993e0f42dd62da45fbe15ceff Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Tue, 14 Oct 2014 14:21:42 -0400 Subject: [PATCH] Schedule fibers in the order they are asynced or unblocked. Earlier behavior was always to start newly-asycned tasks before resuming existing tasks, so existing tasks could be starved if there was a steady stream of new tasks created. Now all tasks are started or resumed in the order they are created or unblocked. --- src/thread/context.hpp | 7 +- src/thread/thread.cpp | 12 +- src/thread/thread_d.hpp | 247 +++++++++++++++++++++++++--------------- 3 files changed, 167 insertions(+), 99 deletions(-) diff --git a/src/thread/context.hpp b/src/thread/context.hpp index 9801499..f7d15e9 100644 --- a/src/thread/context.hpp +++ b/src/thread/context.hpp @@ -60,7 +60,8 @@ namespace fc { cancellation_reason(nullptr), #endif complete(false), - cur_task(0) + cur_task(0), + context_posted_num(0) { #if BOOST_VERSION >= 105600 size_t stack_size = stack_allocator::traits_type::default_size() * 4; @@ -99,7 +100,8 @@ namespace fc { cancellation_reason(nullptr), #endif complete(false), - cur_task(0) + cur_task(0), + context_posted_num(0) {} ~context() { @@ -229,6 +231,7 @@ namespace fc { #endif bool complete; task_base* cur_task; + uint64_t context_posted_num; // serial number set each tiem the context is added to the ready list }; } // naemspace fc diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index ea12d24..60286a0 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -54,6 +54,10 @@ static void set_thread_name(const char* threadName) #endif namespace fc { +#ifdef ENABLE_FC_THREAD_DEBUG_LOG + FILE* thread_debug_log = fopen("C:/thread_debug.log", "w"); +#endif + const char* thread_name() { return thread::current().name().c_str(); } @@ -173,7 +177,7 @@ namespace fc { // move all sleep tasks to ready for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { - my->ready_push_front( my->sleep_pqueue[i] ); + my->add_context_to_ready_list( my->sleep_pqueue[i] ); } my->sleep_pqueue.clear(); @@ -182,7 +186,7 @@ namespace fc { while( cur ) { fc::context* n = cur->next; cur->next = 0; - my->ready_push_front( cur ); + my->add_context_to_ready_list( cur ); cur = n; } @@ -287,7 +291,7 @@ namespace fc { } void thread::async_task( task_base* t, const priority& p ) { - async_task( t, p, time_point::min() ); + async_task( t, p, time_point::min() ); } void thread::poke() { @@ -407,7 +411,7 @@ namespace fc { cur_blocked = my->blocked; } cur->next_blocked = 0; - my->ready_push_front( cur ); + my->add_context_to_ready_list( cur ); } else { // goto the next blocked task prev_blocked = cur_blocked; cur_blocked = cur_blocked->next_blocked; diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index eda1c74..0fd8eb8 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -9,7 +9,16 @@ #include //#include +//#define ENABLE_FC_THREAD_DEBUG_LOG +#ifdef ENABLE_FC_THREAD_DEBUG_LOG +# define thread_debug_msg(x) do { fprintf x; } while (0) +#else +# define thread_debug_msg(x) do {} while (0) +#endif + namespace fc { + extern FILE* thread_debug_log; + struct sleep_priority_less { bool operator()( const context::ptr& a, const context::ptr& b ) { return a->resume_time > b->resume_time; @@ -21,7 +30,7 @@ namespace fc { thread_d(fc::thread& s) :self(s), boost_thread(0), task_in_queue(0), - next_task_posted_num(1), + next_posted_num(1), done(false), current(0), pt_head(0), @@ -78,7 +87,7 @@ namespace fc { boost::atomic task_in_queue; std::vector task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time - uint64_t next_task_posted_num; // each task gets assigned a number in the order it is started, tracked here + uint64_t next_posted_num; // each task or context gets assigned a number in the order it is ready to execute, tracked here std::vector task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run std::vector sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume std::vector free_list; // list of unused contexts that are ready for deletion @@ -187,50 +196,64 @@ namespace fc { return tmp; } - void ready_push_front( const fc::context::ptr& c ) + void add_context_to_ready_list(context* context_to_add) { - BOOST_ASSERT( c->next == nullptr ); - BOOST_ASSERT( c != current ); - // if( c == current ) - // wlog( "pushing current to ready??" ); - c->next = ready_head; - ready_head = c; - if( !ready_tail ) - ready_tail = c; + if (!ready_tail) + ready_head = context_to_add; + else + { + context_to_add->context_posted_num = ++next_posted_num; + ready_tail->next = context_to_add; + } + ready_tail = context_to_add; } - void ready_push_back( const fc::context::ptr& c ) - { - BOOST_ASSERT( c->next == nullptr ); - BOOST_ASSERT( c != current ); - // if( c == current ) - // wlog( "pushing current to ready??" ); - c->next = 0; - if( ready_tail ) - ready_tail->next = c; - else - { - assert( !ready_head ); - ready_head = c; - } - ready_tail = c; - } - - struct task_priority_less - { - bool operator()( task_base* a, task_base* b ) - { - return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num ); - } - }; +#if 0 + void ready_push_front(const fc::context::ptr& context_to_push) + { + BOOST_ASSERT(context_to_push->next == nullptr); + BOOST_ASSERT(context_to_push != current); - struct task_when_less - { - bool operator()( task_base* a, task_base* b ) - { - return a->_when > b->_when; - } - }; + context** iter = &ready_head; + while (*iter && (*iter)->resume_time > context_to_push->resume_time) + iter = &((*iter)->next); + context_to_push->next = *iter; + *iter = context_to_push; + if (!context_to_push->next) + ready_tail = context_to_push; + } + + void ready_push_back(const fc::context::ptr& context_to_push) + { + BOOST_ASSERT(context_to_push->next == nullptr); + BOOST_ASSERT(context_to_push != current); + + if (!ready_tail) + ready_head = context_to_push; + else + { + if (context_to_push->resume_time <= ready_tail->resume_time) + context_to_push->resume_time = ready_tail->resume_time + fc::microseconds(1); + ready_tail->next = context_to_push; + } + ready_tail = context_to_push; + } +#endif + struct task_priority_less + { + bool operator()( task_base* a, task_base* b ) + { + return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num); + } + }; + + struct task_when_less + { + bool operator()( task_base* a, task_base* b ) + { + return a->_when > b->_when; + } + }; void enqueue( task_base* t ) { @@ -250,7 +273,7 @@ namespace fc { } cur = t; - next_task_posted_num += num_ready_tasks; + next_posted_num += num_ready_tasks; unsigned tasks_posted = 0; while (cur) { @@ -262,7 +285,7 @@ namespace fc { } else { - cur->_posted_num = next_task_posted_num - (++tasks_posted); + cur->_posted_num = next_posted_num - (++tasks_posted); task_pqueue.push_back(cur); std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less()); @@ -272,36 +295,46 @@ namespace fc { } } + void move_newly_scheduled_tasks_to_task_pqueue() + { + BOOST_ASSERT(this == thread::current().my); + + // first, if there are any new tasks on 'task_in_queue', which is tasks that + // have been just been async or scheduled, but we haven't processed them. + // move them into the task_sch_queue or task_pqueue, as appropriate + + //DLN: changed from memory_order_consume for boost 1.55. + //This appears to be safest replacement for now, maybe + //can be changed to relaxed later, but needs analysis. + task_base* pending_list = task_in_queue.exchange(0, boost::memory_order_seq_cst); + if (pending_list) + enqueue(pending_list); + + // second, walk through task_sch_queue and move any scheduled tasks that are now + // able to run (because their scheduled time has arrived) to task_pqueue + + while (!task_sch_queue.empty() && + task_sch_queue.front()->_when <= time_point::now()) + { + task_base* ready_task = task_sch_queue.front(); + std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less()); + task_sch_queue.pop_back(); + + ready_task->_posted_num = next_posted_num++; + task_pqueue.push_back(ready_task); + std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less()); + } + } + task_base* dequeue() { // get a new task BOOST_ASSERT( this == thread::current().my ); - - task_base* pending = 0; - //DLN: changed from memory_order_consume for boost 1.55. - //This appears to be safest replacement for now, maybe - //can be changed to relaxed later, but needs analysis. - pending = task_in_queue.exchange(0,boost::memory_order_seq_cst); - if( pending ) - enqueue( pending ); - task_base* p = 0; - if( !task_sch_queue.empty() ) - { - if( task_sch_queue.front()->_when <= time_point::now() ) - { - p = task_sch_queue.front(); - std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less() ); - task_sch_queue.pop_back(); - return p; - } - } - if( !task_pqueue.empty() ) - { - p = task_pqueue.front(); - std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); - task_pqueue.pop_back(); - } + assert(!task_pqueue.empty()); + task_base* p = task_pqueue.front(); + std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); + task_pqueue.pop_back(); return p; } @@ -388,10 +421,14 @@ namespace fc { // jump to next context, saving current context fc::context* prev = current; current = next; - if( reschedule ) - ready_push_back(prev); + if (reschedule) + add_context_to_ready_list(prev); // slog( "jump to %p from %p", next, prev ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n", + name.c_str(), + prev->cur_task ? prev->cur_task->get_desc() : "unknown", + next->cur_task ? next->cur_task->get_desc() : "unknown")); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); #elif BOOST_VERSION >= 105300 @@ -427,10 +464,14 @@ namespace fc { current = next; if( reschedule ) - ready_push_back(prev); + add_context_to_ready_list(prev); // slog( "jump to %p from %p", next, prev ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n", + name.c_str(), + prev->cur_task ? prev->cur_task->get_desc() : "unknown", + next->cur_task ? next->cur_task->get_desc() : "unknown")); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); #elif BOOST_VERSION >= 105300 @@ -478,23 +519,20 @@ namespace fc { self->start_next_fiber( false ); } - bool run_next_task() + void run_next_task() { - check_for_timeouts(); - task_base* next = dequeue(); + task_base* next = dequeue(); - if( next ) - { - next->_set_active_context( current ); - current->cur_task = next; - next->run(); - current->cur_task = 0; - next->_set_active_context(0); - next->release(); - current->reinitialize(); - return true; - } - return false; + next->_set_active_context( current ); + current->cur_task = next; + + thread_debug_msg((thread_debug_log, "EMF: [%s] starting task \"%s\"\n", name.c_str(), next->get_desc())); + + next->run(); + current->cur_task = 0; + next->_set_active_context(0); + next->release(); + current->reinitialize(); } bool has_next_task() @@ -517,8 +555,31 @@ namespace fc { { while( !done || blocked ) { - if( run_next_task() ) + // move all new tasks to the task_pqueue + move_newly_scheduled_tasks_to_task_pqueue(); + + // move all now-ready sleeping tasks to the ready list + check_for_timeouts(); + + if (!task_pqueue.empty()) + { + if (ready_head) + { + // a new task and an existing task are both ready to go + if (ready_head->context_posted_num < task_pqueue.front()->_posted_num) + { + // run the existing task first + pt_push_back(current); + start_next_fiber(false); + continue; + } + } + + // if we made it here, either there's no ready context, or the ready context is + // scheduled after the ready task, so we shoudl run the task first + run_next_task(); continue; + } // if I have something else to do other than // process tasks... do it. @@ -613,7 +674,7 @@ namespace fc { // ilog( "..." ); // ilog( "ready_push_front" ); if (c != current) - ready_push_front( c ); + add_context_to_ready_list(c); } } return time_point::min(); @@ -627,8 +688,8 @@ namespace fc { return; } - if( c != current ) - ready_push_front(c); + if (c != current) + add_context_to_ready_list(c); } void yield_until( const time_point& tp, bool reschedule ) { @@ -722,7 +783,7 @@ namespace fc { { fc::context* next_blocked = (*iter)->next_blocked; (*iter)->next_blocked = nullptr; - ready_push_front(*iter); + add_context_to_ready_list(*iter); *iter = next_blocked; continue; } @@ -742,7 +803,7 @@ namespace fc { break; } if (!already_on_ready_list) - ready_push_front(*sleep_iter); + add_context_to_ready_list(*sleep_iter); sleep_iter = sleep_pqueue.erase(sleep_iter); task_removed_from_sleep_pqueue = true; }