From 5a615e6b21331fbad1c465e53e02d64b6afc7d06 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Fri, 3 Oct 2014 16:52:45 -0400 Subject: [PATCH] Execute async tasks in the order they were asynced (this only changes the behavior of tasks that hadn't started executing yet, it doesn't change anything about the order blocked tasks unblock) --- include/fc/log/logger.hpp | 15 +++++++++++ src/thread/thread_d.hpp | 55 ++++++++++++++++++++++++++++----------- 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/include/fc/log/logger.hpp b/include/fc/log/logger.hpp index d456f8b..7121d5f 100644 --- a/include/fc/log/logger.hpp +++ b/include/fc/log/logger.hpp @@ -159,3 +159,18 @@ namespace fc #define edump( SEQ ) \ elog( FC_FORMAT(SEQ), FC_FORMAT_ARG_PARAMS(SEQ) ) +// this disables all normal logging statements -- not something you'd normally want to do, +// but it's useful if you're benchmarking something and suspect logging is causing +// a slowdown. +#ifdef FC_DISABLE_LOGGING +# undef ulog +# define ulog(...) do {} while(0) +# undef elog +# define elog(...) do {} while(0) +# undef wlog +# define wlog(...) do {} while(0) +# undef ilog +# define ilog(...) do {} while(0) +# undef dlog +# define dlog(...) do {} while(0) +#endif \ No newline at end of file diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 05e4fbb..624d580 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -21,6 +21,7 @@ namespace fc { thread_d(fc::thread& s) :self(s), boost_thread(0), task_in_queue(0), + next_task_posted_num(1), done(false), current(0), pt_head(0), @@ -74,6 +75,7 @@ namespace fc { boost::atomic task_in_queue; std::vector task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time + uint64_t next_task_posted_num; // each task gets assigned a number in the order it is started, tracked here std::vector task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run std::vector sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume std::vector free_list; // list of unused contexts that are ready for deletion @@ -209,23 +211,46 @@ namespace fc { } }; - void enqueue( task_base* t ) { - time_point now = time_point::now(); - task_base* cur = t; - while( cur ) { - if( cur->_when > now ) { - task_sch_queue.push_back(cur); - std::push_heap( task_sch_queue.begin(), - task_sch_queue.end(), task_when_less() ); - } else { - task_pqueue.push_back(cur); - BOOST_ASSERT( this == thread::current().my ); - std::push_heap( task_pqueue.begin(), - task_pqueue.end(), task_priority_less() ); - } - cur = cur->_next; + void enqueue( task_base* t ) + { + time_point now = time_point::now(); + task_base* cur = t; + + // the linked list of tasks passed to enqueue is in the reverse order of + // what you'd expect -- the first task to be scheduled is at the end of + // the list. We'll rectify the ordering by assigning the _posted_num + // in reverse order + unsigned num_ready_tasks = 0; + while (cur) + { + if (cur->_when <= now) + ++num_ready_tasks; + cur = cur->_next; + } + + cur = t; + next_task_posted_num += num_ready_tasks; + unsigned tasks_posted = 0; + while (cur) + { + if (cur->_when > now) + { + task_sch_queue.push_back(cur); + std::push_heap(task_sch_queue.begin(), + task_sch_queue.end(), task_when_less()); } + else + { + cur->_posted_num = next_task_posted_num - (++tasks_posted); + task_pqueue.push_back(cur); + std::push_heap(task_pqueue.begin(), + task_pqueue.end(), task_priority_less()); + BOOST_ASSERT(this == thread::current().my); + } + cur = cur->_next; + } } + task_base* dequeue() { // get a new task BOOST_ASSERT( this == thread::current().my );