Schedule fibers in the order they are asynced or unblocked. Earlier behavior was always to start newly-asycned tasks before resuming existing tasks, so existing tasks could be starved if there was a steady stream of new tasks created. Now all tasks are started or resumed in the order they are created or unblocked.

This commit is contained in:
Eric Frias 2014-10-14 14:21:42 -04:00
parent a426bf9710
commit 1af4ac6a5c
3 changed files with 167 additions and 99 deletions

View file

@ -60,7 +60,8 @@ namespace fc {
cancellation_reason(nullptr), cancellation_reason(nullptr),
#endif #endif
complete(false), complete(false),
cur_task(0) cur_task(0),
context_posted_num(0)
{ {
#if BOOST_VERSION >= 105600 #if BOOST_VERSION >= 105600
size_t stack_size = stack_allocator::traits_type::default_size() * 4; size_t stack_size = stack_allocator::traits_type::default_size() * 4;
@ -99,7 +100,8 @@ namespace fc {
cancellation_reason(nullptr), cancellation_reason(nullptr),
#endif #endif
complete(false), complete(false),
cur_task(0) cur_task(0),
context_posted_num(0)
{} {}
~context() { ~context() {
@ -229,6 +231,7 @@ namespace fc {
#endif #endif
bool complete; bool complete;
task_base* cur_task; task_base* cur_task;
uint64_t context_posted_num; // serial number set each tiem the context is added to the ready list
}; };
} // naemspace fc } // naemspace fc

View file

@ -54,6 +54,10 @@ static void set_thread_name(const char* threadName)
#endif #endif
namespace fc { namespace fc {
#ifdef ENABLE_FC_THREAD_DEBUG_LOG
FILE* thread_debug_log = fopen("C:/thread_debug.log", "w");
#endif
const char* thread_name() { const char* thread_name() {
return thread::current().name().c_str(); return thread::current().name().c_str();
} }
@ -173,7 +177,7 @@ namespace fc {
// move all sleep tasks to ready // move all sleep tasks to ready
for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) {
my->ready_push_front( my->sleep_pqueue[i] ); my->add_context_to_ready_list( my->sleep_pqueue[i] );
} }
my->sleep_pqueue.clear(); my->sleep_pqueue.clear();
@ -182,7 +186,7 @@ namespace fc {
while( cur ) { while( cur ) {
fc::context* n = cur->next; fc::context* n = cur->next;
cur->next = 0; cur->next = 0;
my->ready_push_front( cur ); my->add_context_to_ready_list( cur );
cur = n; cur = n;
} }
@ -287,7 +291,7 @@ namespace fc {
} }
void thread::async_task( task_base* t, const priority& p ) { void thread::async_task( task_base* t, const priority& p ) {
async_task( t, p, time_point::min() ); async_task( t, p, time_point::min() );
} }
void thread::poke() { void thread::poke() {
@ -407,7 +411,7 @@ namespace fc {
cur_blocked = my->blocked; cur_blocked = my->blocked;
} }
cur->next_blocked = 0; cur->next_blocked = 0;
my->ready_push_front( cur ); my->add_context_to_ready_list( cur );
} else { // goto the next blocked task } else { // goto the next blocked task
prev_blocked = cur_blocked; prev_blocked = cur_blocked;
cur_blocked = cur_blocked->next_blocked; cur_blocked = cur_blocked->next_blocked;

View file

@ -9,7 +9,16 @@
#include <vector> #include <vector>
//#include <fc/logger.hpp> //#include <fc/logger.hpp>
//#define ENABLE_FC_THREAD_DEBUG_LOG
#ifdef ENABLE_FC_THREAD_DEBUG_LOG
# define thread_debug_msg(x) do { fprintf x; } while (0)
#else
# define thread_debug_msg(x) do {} while (0)
#endif
namespace fc { namespace fc {
extern FILE* thread_debug_log;
struct sleep_priority_less { struct sleep_priority_less {
bool operator()( const context::ptr& a, const context::ptr& b ) { bool operator()( const context::ptr& a, const context::ptr& b ) {
return a->resume_time > b->resume_time; return a->resume_time > b->resume_time;
@ -21,7 +30,7 @@ namespace fc {
thread_d(fc::thread& s) thread_d(fc::thread& s)
:self(s), boost_thread(0), :self(s), boost_thread(0),
task_in_queue(0), task_in_queue(0),
next_task_posted_num(1), next_posted_num(1),
done(false), done(false),
current(0), current(0),
pt_head(0), pt_head(0),
@ -78,7 +87,7 @@ namespace fc {
boost::atomic<task_base*> task_in_queue; boost::atomic<task_base*> task_in_queue;
std::vector<task_base*> task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time std::vector<task_base*> task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time
uint64_t next_task_posted_num; // each task gets assigned a number in the order it is started, tracked here uint64_t next_posted_num; // each task or context gets assigned a number in the order it is ready to execute, tracked here
std::vector<task_base*> task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run std::vector<task_base*> task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run
std::vector<fc::context*> sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume std::vector<fc::context*> sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume
std::vector<fc::context*> free_list; // list of unused contexts that are ready for deletion std::vector<fc::context*> free_list; // list of unused contexts that are ready for deletion
@ -187,50 +196,64 @@ namespace fc {
return tmp; return tmp;
} }
void ready_push_front( const fc::context::ptr& c ) void add_context_to_ready_list(context* context_to_add)
{ {
BOOST_ASSERT( c->next == nullptr ); if (!ready_tail)
BOOST_ASSERT( c != current ); ready_head = context_to_add;
// if( c == current ) else
// wlog( "pushing current to ready??" ); {
c->next = ready_head; context_to_add->context_posted_num = ++next_posted_num;
ready_head = c; ready_tail->next = context_to_add;
if( !ready_tail ) }
ready_tail = c; ready_tail = context_to_add;
} }
void ready_push_back( const fc::context::ptr& c ) #if 0
{ void ready_push_front(const fc::context::ptr& context_to_push)
BOOST_ASSERT( c->next == nullptr ); {
BOOST_ASSERT( c != current ); BOOST_ASSERT(context_to_push->next == nullptr);
// if( c == current ) BOOST_ASSERT(context_to_push != current);
// wlog( "pushing current to ready??" );
c->next = 0;
if( ready_tail )
ready_tail->next = c;
else
{
assert( !ready_head );
ready_head = c;
}
ready_tail = c;
}
struct task_priority_less
{
bool operator()( task_base* a, task_base* b )
{
return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num );
}
};
struct task_when_less context** iter = &ready_head;
{ while (*iter && (*iter)->resume_time > context_to_push->resume_time)
bool operator()( task_base* a, task_base* b ) iter = &((*iter)->next);
{ context_to_push->next = *iter;
return a->_when > b->_when; *iter = context_to_push;
} if (!context_to_push->next)
}; ready_tail = context_to_push;
}
void ready_push_back(const fc::context::ptr& context_to_push)
{
BOOST_ASSERT(context_to_push->next == nullptr);
BOOST_ASSERT(context_to_push != current);
if (!ready_tail)
ready_head = context_to_push;
else
{
if (context_to_push->resume_time <= ready_tail->resume_time)
context_to_push->resume_time = ready_tail->resume_time + fc::microseconds(1);
ready_tail->next = context_to_push;
}
ready_tail = context_to_push;
}
#endif
struct task_priority_less
{
bool operator()( task_base* a, task_base* b )
{
return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num);
}
};
struct task_when_less
{
bool operator()( task_base* a, task_base* b )
{
return a->_when > b->_when;
}
};
void enqueue( task_base* t ) void enqueue( task_base* t )
{ {
@ -250,7 +273,7 @@ namespace fc {
} }
cur = t; cur = t;
next_task_posted_num += num_ready_tasks; next_posted_num += num_ready_tasks;
unsigned tasks_posted = 0; unsigned tasks_posted = 0;
while (cur) while (cur)
{ {
@ -262,7 +285,7 @@ namespace fc {
} }
else else
{ {
cur->_posted_num = next_task_posted_num - (++tasks_posted); cur->_posted_num = next_posted_num - (++tasks_posted);
task_pqueue.push_back(cur); task_pqueue.push_back(cur);
std::push_heap(task_pqueue.begin(), std::push_heap(task_pqueue.begin(),
task_pqueue.end(), task_priority_less()); task_pqueue.end(), task_priority_less());
@ -272,36 +295,46 @@ namespace fc {
} }
} }
void move_newly_scheduled_tasks_to_task_pqueue()
{
BOOST_ASSERT(this == thread::current().my);
// first, if there are any new tasks on 'task_in_queue', which is tasks that
// have been just been async or scheduled, but we haven't processed them.
// move them into the task_sch_queue or task_pqueue, as appropriate
//DLN: changed from memory_order_consume for boost 1.55.
//This appears to be safest replacement for now, maybe
//can be changed to relaxed later, but needs analysis.
task_base* pending_list = task_in_queue.exchange(0, boost::memory_order_seq_cst);
if (pending_list)
enqueue(pending_list);
// second, walk through task_sch_queue and move any scheduled tasks that are now
// able to run (because their scheduled time has arrived) to task_pqueue
while (!task_sch_queue.empty() &&
task_sch_queue.front()->_when <= time_point::now())
{
task_base* ready_task = task_sch_queue.front();
std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less());
task_sch_queue.pop_back();
ready_task->_posted_num = next_posted_num++;
task_pqueue.push_back(ready_task);
std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less());
}
}
task_base* dequeue() task_base* dequeue()
{ {
// get a new task // get a new task
BOOST_ASSERT( this == thread::current().my ); BOOST_ASSERT( this == thread::current().my );
task_base* pending = 0;
//DLN: changed from memory_order_consume for boost 1.55.
//This appears to be safest replacement for now, maybe
//can be changed to relaxed later, but needs analysis.
pending = task_in_queue.exchange(0,boost::memory_order_seq_cst);
if( pending )
enqueue( pending );
task_base* p = 0; assert(!task_pqueue.empty());
if( !task_sch_queue.empty() ) task_base* p = task_pqueue.front();
{ std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() );
if( task_sch_queue.front()->_when <= time_point::now() ) task_pqueue.pop_back();
{
p = task_sch_queue.front();
std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less() );
task_sch_queue.pop_back();
return p;
}
}
if( !task_pqueue.empty() )
{
p = task_pqueue.front();
std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() );
task_pqueue.pop_back();
}
return p; return p;
} }
@ -388,10 +421,14 @@ namespace fc {
// jump to next context, saving current context // jump to next context, saving current context
fc::context* prev = current; fc::context* prev = current;
current = next; current = next;
if( reschedule ) if (reschedule)
ready_push_back(prev); add_context_to_ready_list(prev);
// slog( "jump to %p from %p", next, prev ); // slog( "jump to %p from %p", next, prev );
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n",
name.c_str(),
prev->cur_task ? prev->cur_task->get_desc() : "unknown",
next->cur_task ? next->cur_task->get_desc() : "unknown"));
#if BOOST_VERSION >= 105600 #if BOOST_VERSION >= 105600
bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); bc::jump_fcontext( &prev->my_context, next->my_context, 0 );
#elif BOOST_VERSION >= 105300 #elif BOOST_VERSION >= 105300
@ -427,10 +464,14 @@ namespace fc {
current = next; current = next;
if( reschedule ) if( reschedule )
ready_push_back(prev); add_context_to_ready_list(prev);
// slog( "jump to %p from %p", next, prev ); // slog( "jump to %p from %p", next, prev );
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n",
name.c_str(),
prev->cur_task ? prev->cur_task->get_desc() : "unknown",
next->cur_task ? next->cur_task->get_desc() : "unknown"));
#if BOOST_VERSION >= 105600 #if BOOST_VERSION >= 105600
bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this );
#elif BOOST_VERSION >= 105300 #elif BOOST_VERSION >= 105300
@ -478,23 +519,20 @@ namespace fc {
self->start_next_fiber( false ); self->start_next_fiber( false );
} }
bool run_next_task() void run_next_task()
{ {
check_for_timeouts(); task_base* next = dequeue();
task_base* next = dequeue();
if( next ) next->_set_active_context( current );
{ current->cur_task = next;
next->_set_active_context( current );
current->cur_task = next; thread_debug_msg((thread_debug_log, "EMF: [%s] starting task \"%s\"\n", name.c_str(), next->get_desc()));
next->run();
current->cur_task = 0; next->run();
next->_set_active_context(0); current->cur_task = 0;
next->release(); next->_set_active_context(0);
current->reinitialize(); next->release();
return true; current->reinitialize();
}
return false;
} }
bool has_next_task() bool has_next_task()
@ -517,8 +555,31 @@ namespace fc {
{ {
while( !done || blocked ) while( !done || blocked )
{ {
if( run_next_task() ) // move all new tasks to the task_pqueue
move_newly_scheduled_tasks_to_task_pqueue();
// move all now-ready sleeping tasks to the ready list
check_for_timeouts();
if (!task_pqueue.empty())
{
if (ready_head)
{
// a new task and an existing task are both ready to go
if (ready_head->context_posted_num < task_pqueue.front()->_posted_num)
{
// run the existing task first
pt_push_back(current);
start_next_fiber(false);
continue;
}
}
// if we made it here, either there's no ready context, or the ready context is
// scheduled after the ready task, so we shoudl run the task first
run_next_task();
continue; continue;
}
// if I have something else to do other than // if I have something else to do other than
// process tasks... do it. // process tasks... do it.
@ -613,7 +674,7 @@ namespace fc {
// ilog( "..." ); // ilog( "..." );
// ilog( "ready_push_front" ); // ilog( "ready_push_front" );
if (c != current) if (c != current)
ready_push_front( c ); add_context_to_ready_list(c);
} }
} }
return time_point::min(); return time_point::min();
@ -627,8 +688,8 @@ namespace fc {
return; return;
} }
if( c != current ) if (c != current)
ready_push_front(c); add_context_to_ready_list(c);
} }
void yield_until( const time_point& tp, bool reschedule ) { void yield_until( const time_point& tp, bool reschedule ) {
@ -722,7 +783,7 @@ namespace fc {
{ {
fc::context* next_blocked = (*iter)->next_blocked; fc::context* next_blocked = (*iter)->next_blocked;
(*iter)->next_blocked = nullptr; (*iter)->next_blocked = nullptr;
ready_push_front(*iter); add_context_to_ready_list(*iter);
*iter = next_blocked; *iter = next_blocked;
continue; continue;
} }
@ -742,7 +803,7 @@ namespace fc {
break; break;
} }
if (!already_on_ready_list) if (!already_on_ready_list)
ready_push_front(*sleep_iter); add_context_to_ready_list(*sleep_iter);
sleep_iter = sleep_pqueue.erase(sleep_iter); sleep_iter = sleep_pqueue.erase(sleep_iter);
task_removed_from_sleep_pqueue = true; task_removed_from_sleep_pqueue = true;
} }