Execute async tasks in the order they were asynced (this only changes the behavior of tasks that hadn't started executing yet, it doesn't change anything about the order blocked tasks unblock)
This commit is contained in:
parent
256df78fc6
commit
5a615e6b21
2 changed files with 55 additions and 15 deletions
|
|
@ -159,3 +159,18 @@ namespace fc
|
|||
#define edump( SEQ ) \
|
||||
elog( FC_FORMAT(SEQ), FC_FORMAT_ARG_PARAMS(SEQ) )
|
||||
|
||||
// this disables all normal logging statements -- not something you'd normally want to do,
|
||||
// but it's useful if you're benchmarking something and suspect logging is causing
|
||||
// a slowdown.
|
||||
#ifdef FC_DISABLE_LOGGING
|
||||
# undef ulog
|
||||
# define ulog(...) do {} while(0)
|
||||
# undef elog
|
||||
# define elog(...) do {} while(0)
|
||||
# undef wlog
|
||||
# define wlog(...) do {} while(0)
|
||||
# undef ilog
|
||||
# define ilog(...) do {} while(0)
|
||||
# undef dlog
|
||||
# define dlog(...) do {} while(0)
|
||||
#endif
|
||||
|
|
@ -21,6 +21,7 @@ namespace fc {
|
|||
thread_d(fc::thread& s)
|
||||
:self(s), boost_thread(0),
|
||||
task_in_queue(0),
|
||||
next_task_posted_num(1),
|
||||
done(false),
|
||||
current(0),
|
||||
pt_head(0),
|
||||
|
|
@ -74,6 +75,7 @@ namespace fc {
|
|||
|
||||
boost::atomic<task_base*> task_in_queue;
|
||||
std::vector<task_base*> task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time
|
||||
uint64_t next_task_posted_num; // each task gets assigned a number in the order it is started, tracked here
|
||||
std::vector<task_base*> task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run
|
||||
std::vector<fc::context*> sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume
|
||||
std::vector<fc::context*> free_list; // list of unused contexts that are ready for deletion
|
||||
|
|
@ -209,23 +211,46 @@ namespace fc {
|
|||
}
|
||||
};
|
||||
|
||||
void enqueue( task_base* t ) {
|
||||
time_point now = time_point::now();
|
||||
task_base* cur = t;
|
||||
while( cur ) {
|
||||
if( cur->_when > now ) {
|
||||
task_sch_queue.push_back(cur);
|
||||
std::push_heap( task_sch_queue.begin(),
|
||||
task_sch_queue.end(), task_when_less() );
|
||||
} else {
|
||||
task_pqueue.push_back(cur);
|
||||
BOOST_ASSERT( this == thread::current().my );
|
||||
std::push_heap( task_pqueue.begin(),
|
||||
task_pqueue.end(), task_priority_less() );
|
||||
}
|
||||
cur = cur->_next;
|
||||
void enqueue( task_base* t )
|
||||
{
|
||||
time_point now = time_point::now();
|
||||
task_base* cur = t;
|
||||
|
||||
// the linked list of tasks passed to enqueue is in the reverse order of
|
||||
// what you'd expect -- the first task to be scheduled is at the end of
|
||||
// the list. We'll rectify the ordering by assigning the _posted_num
|
||||
// in reverse order
|
||||
unsigned num_ready_tasks = 0;
|
||||
while (cur)
|
||||
{
|
||||
if (cur->_when <= now)
|
||||
++num_ready_tasks;
|
||||
cur = cur->_next;
|
||||
}
|
||||
|
||||
cur = t;
|
||||
next_task_posted_num += num_ready_tasks;
|
||||
unsigned tasks_posted = 0;
|
||||
while (cur)
|
||||
{
|
||||
if (cur->_when > now)
|
||||
{
|
||||
task_sch_queue.push_back(cur);
|
||||
std::push_heap(task_sch_queue.begin(),
|
||||
task_sch_queue.end(), task_when_less());
|
||||
}
|
||||
else
|
||||
{
|
||||
cur->_posted_num = next_task_posted_num - (++tasks_posted);
|
||||
task_pqueue.push_back(cur);
|
||||
std::push_heap(task_pqueue.begin(),
|
||||
task_pqueue.end(), task_priority_less());
|
||||
BOOST_ASSERT(this == thread::current().my);
|
||||
}
|
||||
cur = cur->_next;
|
||||
}
|
||||
}
|
||||
|
||||
task_base* dequeue() {
|
||||
// get a new task
|
||||
BOOST_ASSERT( this == thread::current().my );
|
||||
|
|
|
|||
Loading…
Reference in a new issue