From a426bf9710417a8c29a8ce187a424cc651ce071c Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 13 Oct 2014 11:44:16 -0400 Subject: [PATCH 1/8] whitespace and spelling fixes, no functional changes --- src/thread/thread_d.hpp | 310 ++++++++++++++++++++++++---------------- 1 file changed, 190 insertions(+), 120 deletions(-) diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 624d580..eda1c74 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -37,7 +37,9 @@ namespace fc { name = fc::string("th_") + char('a'+cnt++); // printf("thread=%p\n",this); } - ~thread_d(){ + + ~thread_d() + { delete current; fc::context* temp; while (ready_head) @@ -66,7 +68,8 @@ namespace fc { boost_thread->detach(); delete boost_thread; } - } + } + fc::thread& self; boost::thread* boost_thread; stack_allocator stack_alloc; @@ -149,12 +152,14 @@ namespace fc { } #endif // insert at from of blocked linked list - inline void add_to_blocked( fc::context* c ) { + inline void add_to_blocked( fc::context* c ) + { c->next_blocked = blocked; blocked = c; } - void pt_push_back(fc::context* c) { + void pt_push_back(fc::context* c) + { c->next = pt_head; pt_head = c; /* @@ -167,9 +172,12 @@ namespace fc { wlog( "idle context...%2% %1%", c, i ); */ } - fc::context::ptr ready_pop_front() { + + fc::context::ptr ready_pop_front() + { fc::context::ptr tmp = nullptr; - if( ready_head ) { + if( ready_head ) + { tmp = ready_head; ready_head = tmp->next; if( !ready_head ) @@ -178,35 +186,48 @@ namespace fc { } return tmp; } - void ready_push_front( const fc::context::ptr& c ) { + + void ready_push_front( const fc::context::ptr& c ) + { BOOST_ASSERT( c->next == nullptr ); BOOST_ASSERT( c != current ); - //if( c == current ) wlog( "pushing current to ready??" ); + // if( c == current ) + // wlog( "pushing current to ready??" ); c->next = ready_head; ready_head = c; if( !ready_tail ) ready_tail = c; } - void ready_push_back( const fc::context::ptr& c ) { + + void ready_push_back( const fc::context::ptr& c ) + { BOOST_ASSERT( c->next == nullptr ); BOOST_ASSERT( c != current ); - //if( c == current ) wlog( "pushing current to ready??" ); + // if( c == current ) + // wlog( "pushing current to ready??" ); c->next = 0; - if( ready_tail ) { + if( ready_tail ) ready_tail->next = c; - } else { - assert( !ready_head ); - ready_head = c; + else + { + assert( !ready_head ); + ready_head = c; } ready_tail = c; } - struct task_priority_less { - bool operator()( task_base* a, task_base* b ) { + + struct task_priority_less + { + bool operator()( task_base* a, task_base* b ) + { return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num ); } }; - struct task_when_less { - bool operator()( task_base* a, task_base* b ) { + + struct task_when_less + { + bool operator()( task_base* a, task_base* b ) + { return a->_when > b->_when; } }; @@ -251,7 +272,8 @@ namespace fc { } } - task_base* dequeue() { + task_base* dequeue() + { // get a new task BOOST_ASSERT( this == thread::current().my ); @@ -260,18 +282,22 @@ namespace fc { //This appears to be safest replacement for now, maybe //can be changed to relaxed later, but needs analysis. pending = task_in_queue.exchange(0,boost::memory_order_seq_cst); - if( pending ) { enqueue( pending ); } + if( pending ) + enqueue( pending ); - task_base* p(0); - if( task_sch_queue.size() ) { - if( task_sch_queue.front()->_when <= time_point::now() ) { + task_base* p = 0; + if( !task_sch_queue.empty() ) + { + if( task_sch_queue.front()->_when <= time_point::now() ) + { p = task_sch_queue.front(); std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less() ); task_sch_queue.pop_back(); return p; } } - if( task_pqueue.size() ) { + if( !task_pqueue.empty() ) + { p = task_pqueue.front(); std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); task_pqueue.pop_back(); @@ -307,7 +333,8 @@ namespace fc { * This should be before or after a context switch to * detect quit/cancel operations and throw an exception. */ - void check_fiber_exceptions() { + void check_fiber_exceptions() + { if( current && current->canceled ) { #ifdef NDEBUG FC_THROW_EXCEPTION( canceled_exception, "" ); @@ -317,7 +344,7 @@ namespace fc { } else if( done ) { ilog( "throwing canceled exception" ); FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: thread quitting" ); - // BOOST_THROW_EXCEPTION( thread_quit() ); + // BOOST_THROW_EXCEPTION( thread_quit() ); } } @@ -326,7 +353,8 @@ namespace fc { * If none are available then create a new context and * have it wait for something to do. */ - bool start_next_fiber( bool reschedule = false ) { + bool start_next_fiber( bool reschedule = false ) + { /* If this assert fires, it means you are executing an operation that is causing * the current task to yield, but there is a ASSERT_TASK_NOT_PREEMPTED() in effect * (somewhere up the stack) */ @@ -342,54 +370,67 @@ namespace fc { assert(std::current_exception() == std::exception_ptr()); check_for_timeouts(); - if( !current ) current = new fc::context( &fc::thread::current() ); + if( !current ) + current = new fc::context( &fc::thread::current() ); // check to see if any other contexts are ready - if( ready_head ) { + if( ready_head ) + { fc::context* next = ready_pop_front(); - if( next == current ) { + if( next == current ) + { // elog( "next == current... something went wrong" ); assert(next != current); - return false; + return false; } BOOST_ASSERT( next != current ); // jump to next context, saving current context fc::context* prev = current; current = next; - if( reschedule ) ready_push_back(prev); - // slog( "jump to %p from %p", next, prev ); - // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + if( reschedule ) + ready_push_back(prev); + // slog( "jump to %p from %p", next, prev ); + // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); #if BOOST_VERSION >= 105600 - bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); + bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); #elif BOOST_VERSION >= 105300 - bc::jump_fcontext( prev->my_context, next->my_context, 0 ); + bc::jump_fcontext( prev->my_context, next->my_context, 0 ); #else - bc::jump_fcontext( &prev->my_context, &next->my_context, 0 ); + bc::jump_fcontext( &prev->my_context, &next->my_context, 0 ); #endif - BOOST_ASSERT( current ); - BOOST_ASSERT( current == prev ); + BOOST_ASSERT( current ); + BOOST_ASSERT( current == prev ); //current = prev; - } else { // all contexts are blocked, create a new context - // that will process posted tasks... + } + else + { + // all contexts are blocked, create a new context + // that will process posted tasks... fc::context* prev = current; fc::context* next = nullptr; - if( pt_head ) { // grab cached context + if( pt_head ) + { + // grab cached context next = pt_head; pt_head = pt_head->next; next->next = 0; next->reinitialize(); - } else { // create new context. + } + else + { + // create new context. next = new fc::context( &thread_d::start_process_tasks, stack_alloc, - &fc::thread::current() ); + &fc::thread::current() ); } current = next; - if( reschedule ) ready_push_back(prev); + if( reschedule ) + ready_push_back(prev); - // slog( "jump to %p from %p", next, prev ); - // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + // slog( "jump to %p from %p", next, prev ); + // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); #elif BOOST_VERSION >= 105300 @@ -402,7 +443,8 @@ namespace fc { //current = prev; } - if( current->canceled ) { + if( current->canceled ) + { //current->canceled = false; #ifdef NDEBUG FC_THROW_EXCEPTION( canceled_exception, "" ); @@ -414,27 +456,35 @@ namespace fc { return true; } - static void start_process_tasks( intptr_t my ) { + static void start_process_tasks( intptr_t my ) + { thread_d* self = (thread_d*)my; - try { + try + { self->process_tasks(); - } catch ( canceled_exception& ) { - // allowed exception... - } catch ( ... ) { - elog( "fiber ${name} exited with uncaught exception: ${e}", ("e",fc::except_str())("name", self->name) ); - // assert( !"fiber exited with uncaught exception" ); - //TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<< - // boost::current_exception_diagnostic_information() <name) ); + // assert( !"fiber exited with uncaught exception" ); + //TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<< + // boost::current_exception_diagnostic_information() <free_list.push_back(self->current); self->start_next_fiber( false ); } - bool run_next_task() { + bool run_next_task() + { check_for_timeouts(); task_base* next = dequeue(); - if( next ) { + if( next ) + { next->_set_active_context( current ); current->cur_task = next; next->run(); @@ -446,32 +496,41 @@ namespace fc { } return false; } - bool has_next_task() { + + bool has_next_task() + { if( task_pqueue.size() || (task_sch_queue.size() && task_sch_queue.front()->_when <= time_point::now()) || task_in_queue.load( boost::memory_order_relaxed ) ) - return true; + return true; return false; } - void clear_free_list() { - for( uint32_t i = 0; i < free_list.size(); ++i ) { + + void clear_free_list() + { + for( uint32_t i = 0; i < free_list.size(); ++i ) delete free_list[i]; - } free_list.clear(); } - void process_tasks() { - while( !done || blocked ) { - if( run_next_task() ) continue; + + void process_tasks() + { + while( !done || blocked ) + { + if( run_next_task() ) + continue; // if I have something else to do other than // process tasks... do it. - if( ready_head ) { + if( ready_head ) + { pt_push_back( current ); start_next_fiber(false); continue; } - if( process_canceled_tasks() ) continue; + if( process_canceled_tasks() ) + continue; clear_free_list(); @@ -481,9 +540,12 @@ namespace fc { time_point timeout_time = check_for_timeouts(); if( done ) return; - if( timeout_time == time_point::maximum() ) { + if( timeout_time == time_point::maximum() ) + { task_ready.wait( lock ); - } else if( timeout_time != time_point::min() ) { + } + else if( timeout_time != time_point::min() ) + { // there may be tasks that have been canceled we should filter them out now // rather than waiting... @@ -515,10 +577,12 @@ namespace fc { * Retunn system_clock::time_point::max() if there are no scheduled tasks * Return the time the next task needs to be run if there is anything scheduled. */ - time_point check_for_timeouts() { - if( !sleep_pqueue.size() && !task_sch_queue.size() ) { - //ilog( "no timeouts ready" ); - return time_point::maximum(); + time_point check_for_timeouts() + { + if( !sleep_pqueue.size() && !task_sch_queue.size() ) + { + // ilog( "no timeouts ready" ); + return time_point::maximum(); } time_point next = time_point::maximum(); @@ -528,51 +592,53 @@ namespace fc { next = task_sch_queue.front()->_when; time_point now = time_point::now(); - if( now < next ) { return next; } + if( now < next ) + return next; // move all expired sleeping tasks to the ready queue while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now ) { - fc::context::ptr c = sleep_pqueue.front(); - std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); - //ilog( "sleep pop back..." ); - sleep_pqueue.pop_back(); + fc::context::ptr c = sleep_pqueue.front(); + std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); + // ilog( "sleep pop back..." ); + sleep_pqueue.pop_back(); - if( c->blocking_prom.size() ) - { - // ilog( "timeotu blocking prom" ); - c->timeout_blocking_promises(); - } - else - { - //ilog( "..." ); - //ilog( "ready_push_front" ); - if (c != current) - ready_push_front( c ); - } + if( c->blocking_prom.size() ) + { + // ilog( "timeout blocking prom" ); + c->timeout_blocking_promises(); + } + else + { + // ilog( "..." ); + // ilog( "ready_push_front" ); + if (c != current) + ready_push_front( c ); + } } return time_point::min(); } - void unblock( fc::context* c ) { - if( fc::thread::current().my != this ) { - self.async( [=](){ unblock(c); }, "thread_d::unblock" ); - return; - } - if( c != current ) ready_push_front(c); - } + void unblock( fc::context* c ) + { + if( fc::thread::current().my != this ) + { + self.async( [=](){ unblock(c); }, "thread_d::unblock" ); + return; + } + + if( c != current ) + ready_push_front(c); + } void yield_until( const time_point& tp, bool reschedule ) { check_fiber_exceptions(); if( tp <= (time_point::now()+fc::microseconds(10000)) ) - { return; - } - if( !current ) { + if( !current ) current = new fc::context(&fc::thread::current()); - } current->resume_time = tp; current->clear_blocking_promises(); @@ -584,8 +650,10 @@ namespace fc { start_next_fiber(reschedule); // clear current context from sleep queue... - for( uint32_t i = 0; i < sleep_pqueue.size(); ++i ) { - if( sleep_pqueue[i] == current ) { + for( uint32_t i = 0; i < sleep_pqueue.size(); ++i ) + { + if( sleep_pqueue[i] == current ) + { sleep_pqueue[i] = sleep_pqueue.back(); sleep_pqueue.pop_back(); std::make_heap( sleep_pqueue.begin(), @@ -599,35 +667,37 @@ namespace fc { } void wait( const promise_base::ptr& p, const time_point& timeout ) { - if( p->ready() ) return; + if( p->ready() ) + return; + if( timeout < time_point::now() ) - FC_THROW_EXCEPTION( timeout_exception, "" ); + FC_THROW_EXCEPTION( timeout_exception, "" ); - if( !current ) { + if( !current ) current = new fc::context(&fc::thread::current()); - } - //slog( " %1% blocking on %2%", current, p.get() ); + // slog( " %1% blocking on %2%", current, p.get() ); current->add_blocking_promise(p.get(),true); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { - current->resume_time = timeout; - sleep_pqueue.push_back(current); - std::push_heap( sleep_pqueue.begin(), - sleep_pqueue.end(), - sleep_priority_less() ); + if( timeout != time_point::maximum() ) + { + current->resume_time = timeout; + sleep_pqueue.push_back(current); + std::push_heap( sleep_pqueue.begin(), + sleep_pqueue.end(), + sleep_priority_less() ); } - // elog( "blocking %1%", current ); + // elog( "blocking %1%", current ); add_to_blocked( current ); - // debug("swtiching fibers..." ); + // debug("swtiching fibers..." ); start_next_fiber(); - // slog( "resuming %1%", current ); + // slog( "resuming %1%", current ); - //slog( " %1% unblocking blocking on %2%", current, p.get() ); + // slog( " %1% unblocking blocking on %2%", current, p.get() ); current->remove_blocking_promise(p.get()); check_fiber_exceptions(); From 1af4ac6a5cd4e96993e0f42dd62da45fbe15ceff Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Tue, 14 Oct 2014 14:21:42 -0400 Subject: [PATCH 2/8] Schedule fibers in the order they are asynced or unblocked. Earlier behavior was always to start newly-asycned tasks before resuming existing tasks, so existing tasks could be starved if there was a steady stream of new tasks created. Now all tasks are started or resumed in the order they are created or unblocked. --- src/thread/context.hpp | 7 +- src/thread/thread.cpp | 12 +- src/thread/thread_d.hpp | 247 +++++++++++++++++++++++++--------------- 3 files changed, 167 insertions(+), 99 deletions(-) diff --git a/src/thread/context.hpp b/src/thread/context.hpp index 9801499..f7d15e9 100644 --- a/src/thread/context.hpp +++ b/src/thread/context.hpp @@ -60,7 +60,8 @@ namespace fc { cancellation_reason(nullptr), #endif complete(false), - cur_task(0) + cur_task(0), + context_posted_num(0) { #if BOOST_VERSION >= 105600 size_t stack_size = stack_allocator::traits_type::default_size() * 4; @@ -99,7 +100,8 @@ namespace fc { cancellation_reason(nullptr), #endif complete(false), - cur_task(0) + cur_task(0), + context_posted_num(0) {} ~context() { @@ -229,6 +231,7 @@ namespace fc { #endif bool complete; task_base* cur_task; + uint64_t context_posted_num; // serial number set each tiem the context is added to the ready list }; } // naemspace fc diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index ea12d24..60286a0 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -54,6 +54,10 @@ static void set_thread_name(const char* threadName) #endif namespace fc { +#ifdef ENABLE_FC_THREAD_DEBUG_LOG + FILE* thread_debug_log = fopen("C:/thread_debug.log", "w"); +#endif + const char* thread_name() { return thread::current().name().c_str(); } @@ -173,7 +177,7 @@ namespace fc { // move all sleep tasks to ready for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { - my->ready_push_front( my->sleep_pqueue[i] ); + my->add_context_to_ready_list( my->sleep_pqueue[i] ); } my->sleep_pqueue.clear(); @@ -182,7 +186,7 @@ namespace fc { while( cur ) { fc::context* n = cur->next; cur->next = 0; - my->ready_push_front( cur ); + my->add_context_to_ready_list( cur ); cur = n; } @@ -287,7 +291,7 @@ namespace fc { } void thread::async_task( task_base* t, const priority& p ) { - async_task( t, p, time_point::min() ); + async_task( t, p, time_point::min() ); } void thread::poke() { @@ -407,7 +411,7 @@ namespace fc { cur_blocked = my->blocked; } cur->next_blocked = 0; - my->ready_push_front( cur ); + my->add_context_to_ready_list( cur ); } else { // goto the next blocked task prev_blocked = cur_blocked; cur_blocked = cur_blocked->next_blocked; diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index eda1c74..0fd8eb8 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -9,7 +9,16 @@ #include //#include +//#define ENABLE_FC_THREAD_DEBUG_LOG +#ifdef ENABLE_FC_THREAD_DEBUG_LOG +# define thread_debug_msg(x) do { fprintf x; } while (0) +#else +# define thread_debug_msg(x) do {} while (0) +#endif + namespace fc { + extern FILE* thread_debug_log; + struct sleep_priority_less { bool operator()( const context::ptr& a, const context::ptr& b ) { return a->resume_time > b->resume_time; @@ -21,7 +30,7 @@ namespace fc { thread_d(fc::thread& s) :self(s), boost_thread(0), task_in_queue(0), - next_task_posted_num(1), + next_posted_num(1), done(false), current(0), pt_head(0), @@ -78,7 +87,7 @@ namespace fc { boost::atomic task_in_queue; std::vector task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time - uint64_t next_task_posted_num; // each task gets assigned a number in the order it is started, tracked here + uint64_t next_posted_num; // each task or context gets assigned a number in the order it is ready to execute, tracked here std::vector task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run std::vector sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume std::vector free_list; // list of unused contexts that are ready for deletion @@ -187,50 +196,64 @@ namespace fc { return tmp; } - void ready_push_front( const fc::context::ptr& c ) + void add_context_to_ready_list(context* context_to_add) { - BOOST_ASSERT( c->next == nullptr ); - BOOST_ASSERT( c != current ); - // if( c == current ) - // wlog( "pushing current to ready??" ); - c->next = ready_head; - ready_head = c; - if( !ready_tail ) - ready_tail = c; + if (!ready_tail) + ready_head = context_to_add; + else + { + context_to_add->context_posted_num = ++next_posted_num; + ready_tail->next = context_to_add; + } + ready_tail = context_to_add; } - void ready_push_back( const fc::context::ptr& c ) - { - BOOST_ASSERT( c->next == nullptr ); - BOOST_ASSERT( c != current ); - // if( c == current ) - // wlog( "pushing current to ready??" ); - c->next = 0; - if( ready_tail ) - ready_tail->next = c; - else - { - assert( !ready_head ); - ready_head = c; - } - ready_tail = c; - } - - struct task_priority_less - { - bool operator()( task_base* a, task_base* b ) - { - return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num ); - } - }; +#if 0 + void ready_push_front(const fc::context::ptr& context_to_push) + { + BOOST_ASSERT(context_to_push->next == nullptr); + BOOST_ASSERT(context_to_push != current); - struct task_when_less - { - bool operator()( task_base* a, task_base* b ) - { - return a->_when > b->_when; - } - }; + context** iter = &ready_head; + while (*iter && (*iter)->resume_time > context_to_push->resume_time) + iter = &((*iter)->next); + context_to_push->next = *iter; + *iter = context_to_push; + if (!context_to_push->next) + ready_tail = context_to_push; + } + + void ready_push_back(const fc::context::ptr& context_to_push) + { + BOOST_ASSERT(context_to_push->next == nullptr); + BOOST_ASSERT(context_to_push != current); + + if (!ready_tail) + ready_head = context_to_push; + else + { + if (context_to_push->resume_time <= ready_tail->resume_time) + context_to_push->resume_time = ready_tail->resume_time + fc::microseconds(1); + ready_tail->next = context_to_push; + } + ready_tail = context_to_push; + } +#endif + struct task_priority_less + { + bool operator()( task_base* a, task_base* b ) + { + return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num); + } + }; + + struct task_when_less + { + bool operator()( task_base* a, task_base* b ) + { + return a->_when > b->_when; + } + }; void enqueue( task_base* t ) { @@ -250,7 +273,7 @@ namespace fc { } cur = t; - next_task_posted_num += num_ready_tasks; + next_posted_num += num_ready_tasks; unsigned tasks_posted = 0; while (cur) { @@ -262,7 +285,7 @@ namespace fc { } else { - cur->_posted_num = next_task_posted_num - (++tasks_posted); + cur->_posted_num = next_posted_num - (++tasks_posted); task_pqueue.push_back(cur); std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less()); @@ -272,36 +295,46 @@ namespace fc { } } + void move_newly_scheduled_tasks_to_task_pqueue() + { + BOOST_ASSERT(this == thread::current().my); + + // first, if there are any new tasks on 'task_in_queue', which is tasks that + // have been just been async or scheduled, but we haven't processed them. + // move them into the task_sch_queue or task_pqueue, as appropriate + + //DLN: changed from memory_order_consume for boost 1.55. + //This appears to be safest replacement for now, maybe + //can be changed to relaxed later, but needs analysis. + task_base* pending_list = task_in_queue.exchange(0, boost::memory_order_seq_cst); + if (pending_list) + enqueue(pending_list); + + // second, walk through task_sch_queue and move any scheduled tasks that are now + // able to run (because their scheduled time has arrived) to task_pqueue + + while (!task_sch_queue.empty() && + task_sch_queue.front()->_when <= time_point::now()) + { + task_base* ready_task = task_sch_queue.front(); + std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less()); + task_sch_queue.pop_back(); + + ready_task->_posted_num = next_posted_num++; + task_pqueue.push_back(ready_task); + std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less()); + } + } + task_base* dequeue() { // get a new task BOOST_ASSERT( this == thread::current().my ); - - task_base* pending = 0; - //DLN: changed from memory_order_consume for boost 1.55. - //This appears to be safest replacement for now, maybe - //can be changed to relaxed later, but needs analysis. - pending = task_in_queue.exchange(0,boost::memory_order_seq_cst); - if( pending ) - enqueue( pending ); - task_base* p = 0; - if( !task_sch_queue.empty() ) - { - if( task_sch_queue.front()->_when <= time_point::now() ) - { - p = task_sch_queue.front(); - std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less() ); - task_sch_queue.pop_back(); - return p; - } - } - if( !task_pqueue.empty() ) - { - p = task_pqueue.front(); - std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); - task_pqueue.pop_back(); - } + assert(!task_pqueue.empty()); + task_base* p = task_pqueue.front(); + std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); + task_pqueue.pop_back(); return p; } @@ -388,10 +421,14 @@ namespace fc { // jump to next context, saving current context fc::context* prev = current; current = next; - if( reschedule ) - ready_push_back(prev); + if (reschedule) + add_context_to_ready_list(prev); // slog( "jump to %p from %p", next, prev ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n", + name.c_str(), + prev->cur_task ? prev->cur_task->get_desc() : "unknown", + next->cur_task ? next->cur_task->get_desc() : "unknown")); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); #elif BOOST_VERSION >= 105300 @@ -427,10 +464,14 @@ namespace fc { current = next; if( reschedule ) - ready_push_back(prev); + add_context_to_ready_list(prev); // slog( "jump to %p from %p", next, prev ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n", + name.c_str(), + prev->cur_task ? prev->cur_task->get_desc() : "unknown", + next->cur_task ? next->cur_task->get_desc() : "unknown")); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); #elif BOOST_VERSION >= 105300 @@ -478,23 +519,20 @@ namespace fc { self->start_next_fiber( false ); } - bool run_next_task() + void run_next_task() { - check_for_timeouts(); - task_base* next = dequeue(); + task_base* next = dequeue(); - if( next ) - { - next->_set_active_context( current ); - current->cur_task = next; - next->run(); - current->cur_task = 0; - next->_set_active_context(0); - next->release(); - current->reinitialize(); - return true; - } - return false; + next->_set_active_context( current ); + current->cur_task = next; + + thread_debug_msg((thread_debug_log, "EMF: [%s] starting task \"%s\"\n", name.c_str(), next->get_desc())); + + next->run(); + current->cur_task = 0; + next->_set_active_context(0); + next->release(); + current->reinitialize(); } bool has_next_task() @@ -517,8 +555,31 @@ namespace fc { { while( !done || blocked ) { - if( run_next_task() ) + // move all new tasks to the task_pqueue + move_newly_scheduled_tasks_to_task_pqueue(); + + // move all now-ready sleeping tasks to the ready list + check_for_timeouts(); + + if (!task_pqueue.empty()) + { + if (ready_head) + { + // a new task and an existing task are both ready to go + if (ready_head->context_posted_num < task_pqueue.front()->_posted_num) + { + // run the existing task first + pt_push_back(current); + start_next_fiber(false); + continue; + } + } + + // if we made it here, either there's no ready context, or the ready context is + // scheduled after the ready task, so we shoudl run the task first + run_next_task(); continue; + } // if I have something else to do other than // process tasks... do it. @@ -613,7 +674,7 @@ namespace fc { // ilog( "..." ); // ilog( "ready_push_front" ); if (c != current) - ready_push_front( c ); + add_context_to_ready_list(c); } } return time_point::min(); @@ -627,8 +688,8 @@ namespace fc { return; } - if( c != current ) - ready_push_front(c); + if (c != current) + add_context_to_ready_list(c); } void yield_until( const time_point& tp, bool reschedule ) { @@ -722,7 +783,7 @@ namespace fc { { fc::context* next_blocked = (*iter)->next_blocked; (*iter)->next_blocked = nullptr; - ready_push_front(*iter); + add_context_to_ready_list(*iter); *iter = next_blocked; continue; } @@ -742,7 +803,7 @@ namespace fc { break; } if (!already_on_ready_list) - ready_push_front(*sleep_iter); + add_context_to_ready_list(*sleep_iter); sleep_iter = sleep_pqueue.erase(sleep_iter); task_removed_from_sleep_pqueue = true; } From 95eb84e62d205214824b56569e316490f3b06eee Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Thu, 16 Oct 2014 16:25:12 -0400 Subject: [PATCH 3/8] Add missing include --- src/variant.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/variant.cpp b/src/variant.cpp index df5d716..148ea39 100644 --- a/src/variant.cpp +++ b/src/variant.cpp @@ -9,6 +9,7 @@ #include #include #include +#include namespace fc { From e5666cca54cb8f3c3c5798d514a377a67a979c92 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Thu, 16 Oct 2014 16:26:19 -0400 Subject: [PATCH 4/8] Convert ready_head (list of fibers able to run immediately) into a priority heap --- include/fc/thread/priority.hpp | 1 + src/thread/thread.cpp | 161 ++++++++++++++--------- src/thread/thread_d.hpp | 234 ++++++++++++++++++++------------- 3 files changed, 246 insertions(+), 150 deletions(-) diff --git a/include/fc/thread/priority.hpp b/include/fc/thread/priority.hpp index 90749ab..01fd61e 100644 --- a/include/fc/thread/priority.hpp +++ b/include/fc/thread/priority.hpp @@ -15,6 +15,7 @@ namespace fc { } static priority max() { return priority(10000); } static priority min() { return priority(-10000); } + static priority _internal__priority_for_short_sleeps() { return priority(-100000); } int value; }; } diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 60286a0..132833e 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -54,10 +54,6 @@ static void set_thread_name(const char* threadName) #endif namespace fc { -#ifdef ENABLE_FC_THREAD_DEBUG_LOG - FILE* thread_debug_log = fopen("C:/thread_debug.log", "w"); -#endif - const char* thread_name() { return thread::current().name().c_str(); } @@ -191,101 +187,123 @@ namespace fc { } // mark all ready tasks (should be everyone)... as canceled +#ifdef READY_LIST_IS_HEAP + for (fc::context* ready_context : my->ready_heap) + ready_context->canceled = true; +#else cur = my->ready_head; while( cur ) { cur->canceled = true; cur = cur->next; } +#endif my->done = true; // now that we have poked all fibers... switch to the next one and // let them all quit. - while( my->ready_head ) { +#ifdef READY_LIST_IS_HEAP + while (!my->ready_heap.empty()) + { my->start_next_fiber(true); my->check_for_timeouts(); } +#else + while (my->ready_head) + { + my->start_next_fiber(true); + my->check_for_timeouts(); + } +#endif my->clear_free_list(); my->cleanup_thread_specific_data(); } - void thread::exec() { - if( !my->current ) my->current = new fc::context(&fc::thread::current()); - try { + void thread::exec() + { + if( !my->current ) + my->current = new fc::context(&fc::thread::current()); + + try + { my->process_tasks(); } catch( canceled_exception& e ) { - wlog( "thread canceled: ${e}", ("e", e.to_detail_string()) ); + wlog( "thread canceled: ${e}", ("e", e.to_detail_string()) ); } delete my->current; my->current = 0; } - bool thread::is_running()const { + bool thread::is_running()const + { return !my->done; } - priority thread::current_priority()const { + priority thread::current_priority()const + { BOOST_ASSERT(my); - if( my->current ) return my->current->prio; + if( my->current ) + return my->current->prio; return priority(); } - void thread::yield(bool reschedule ) { + void thread::yield(bool reschedule) + { my->check_fiber_exceptions(); my->start_next_fiber(reschedule); my->check_fiber_exceptions(); } - void thread::sleep_until( const time_point& tp ) { - if( tp <= (time_point::now()+fc::microseconds(10000)) ) - { - this->yield(true); - } - my->yield_until( tp, false ); + void thread::sleep_until( const time_point& tp ) + { + if( tp <= (time_point::now()+fc::microseconds(10000)) ) + yield(true); + my->yield_until( tp, false ); } int thread::wait_any_until( std::vector&& p, const time_point& timeout) { - for( size_t i = 0; i < p.size(); ++i ) { - if( p[i]->ready() ) return i; - } + for( size_t i = 0; i < p.size(); ++i ) + if( p[i]->ready() ) + return i; - if( timeout < time_point::now() ) { - fc::stringstream ss; - for( auto i = p.begin(); i != p.end(); ++i ) { - ss << (*i)->get_desc() <<", "; - } - FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) ); + if( timeout < time_point::now() ) + { + fc::stringstream ss; + for( auto i = p.begin(); i != p.end(); ++i ) + ss << (*i)->get_desc() << ", "; + + FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) ); } - if( !my->current ) { + if( !my->current ) my->current = new fc::context(&fc::thread::current()); - } - for( uint32_t i = 0; i < p.size(); ++i ) { - my->current->add_blocking_promise(p[i].get(),false); - }; + for( uint32_t i = 0; i < p.size(); ++i ) + my->current->add_blocking_promise(p[i].get(),false); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { + if( timeout != time_point::maximum() ) + { my->current->resume_time = timeout; my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), sleep_priority_less() ); } + my->add_to_blocked( my->current ); my->start_next_fiber(); - for( auto i = p.begin(); i != p.end(); ++i ) { - my->current->remove_blocking_promise(i->get()); - } + for( auto i = p.begin(); i != p.end(); ++i ) + my->current->remove_blocking_promise(i->get()); my->check_fiber_exceptions(); - for( uint32_t i = 0; i < p.size(); ++i ) { - if( p[i]->ready() ) return i; - } + for( uint32_t i = 0; i < p.size(); ++i ) + if( p[i]->ready() ) + return i; + //BOOST_THROW_EXCEPTION( wait_any_error() ); return -1; } @@ -327,35 +345,43 @@ namespace fc { thread::current().sleep_until(tp); } - void exec() { + void exec() + { return thread::current().exec(); } - int wait_any( std::vector&& v, const microseconds& timeout_us ) { + int wait_any( std::vector&& v, const microseconds& timeout_us ) + { return thread::current().wait_any_until( fc::move(v), time_point::now() + timeout_us ); } - int wait_any_until( std::vector&& v, const time_point& tp ) { + + int wait_any_until( std::vector&& v, const time_point& tp ) + { return thread::current().wait_any_until( fc::move(v), tp ); } - void thread::wait_until( promise_base::ptr&& p, const time_point& timeout ) { - if( p->ready() ) return; + + void thread::wait_until( promise_base::ptr&& p, const time_point& timeout ) + { + if( p->ready() ) + return; + if( timeout < time_point::now() ) - FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) ); + FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) ); - if( !my->current ) { + if( !my->current ) my->current = new fc::context(&fc::thread::current()); - } //slog( " %1% blocking on %2%", my->current, p.get() ); - my->current->add_blocking_promise(p.get(),true); + my->current->add_blocking_promise(p.get(), true); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { + if( timeout != time_point::maximum() ) + { my->current->resume_time = timeout; my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), - sleep_priority_less() ); + sleep_priority_less() ); } // elog( "blocking %1%", my->current ); @@ -372,10 +398,12 @@ namespace fc { my->check_fiber_exceptions(); } - void thread::notify( const promise_base::ptr& p ) { + void thread::notify( const promise_base::ptr& p ) + { //slog( "this %p my %p", this, my ); BOOST_ASSERT(p->ready()); - if( !is_current() ) { + if( !is_current() ) + { this->async( [=](){ notify(p); }, "notify", priority::max() ); return; } @@ -387,14 +415,18 @@ namespace fc { fc::context* cur_blocked = my->blocked; fc::context* prev_blocked = 0; - while( cur_blocked ) { + while( cur_blocked ) + { // if the blocked context is waiting on this promise - if( cur_blocked->try_unblock( p.get() ) ) { + if( cur_blocked->try_unblock( p.get() ) ) + { // remove it from the blocked list. // remove this context from the sleep queue... - for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { - if( my->sleep_pqueue[i] == cur_blocked ) { + for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) + { + if( my->sleep_pqueue[i] == cur_blocked ) + { my->sleep_pqueue[i]->blocking_prom.clear(); my->sleep_pqueue[i] = my->sleep_pqueue.back(); my->sleep_pqueue.pop_back(); @@ -403,22 +435,29 @@ namespace fc { } } auto cur = cur_blocked; - if( prev_blocked ) { + if( prev_blocked ) + { prev_blocked->next_blocked = cur_blocked->next_blocked; cur_blocked = prev_blocked->next_blocked; - } else { + } + else + { my->blocked = cur_blocked->next_blocked; cur_blocked = my->blocked; } cur->next_blocked = 0; my->add_context_to_ready_list( cur ); - } else { // goto the next blocked task + } + else + { // goto the next blocked task prev_blocked = cur_blocked; cur_blocked = cur_blocked->next_blocked; } } } - bool thread::is_current()const { + + bool thread::is_current()const + { return this == ¤t(); } diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 0fd8eb8..8b84ee7 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -8,17 +8,9 @@ #include #include //#include - -//#define ENABLE_FC_THREAD_DEBUG_LOG -#ifdef ENABLE_FC_THREAD_DEBUG_LOG -# define thread_debug_msg(x) do { fprintf x; } while (0) -#else -# define thread_debug_msg(x) do {} while (0) -#endif +#define READY_LIST_IS_HEAP namespace fc { - extern FILE* thread_debug_log; - struct sleep_priority_less { bool operator()( const context::ptr& a, const context::ptr& b ) { return a->resume_time > b->resume_time; @@ -34,8 +26,10 @@ namespace fc { done(false), current(0), pt_head(0), +#ifndef READY_LIST_IS_HEAP ready_head(0), ready_tail(0), +#endif blocked(0), next_unused_task_storage_slot(0) #ifndef NDEBUG @@ -51,12 +45,18 @@ namespace fc { { delete current; fc::context* temp; +#ifdef READY_LIST_IS_HEAP + for (fc::context* ready_context : ready_heap) + delete ready_context; + ready_heap.clear(); +#else while (ready_head) { temp = ready_head->next; delete ready_head; ready_head = temp; } +#endif while (blocked) { temp = blocked->next; @@ -98,8 +98,12 @@ namespace fc { fc::context* pt_head; // list of contexts that can be reused for new tasks +#ifdef READY_LIST_IS_HEAP + std::vector ready_heap; +#else fc::context* ready_head; // linked list (using 'next') of contexts that are ready to run fc::context* ready_tail; +#endif fc::context* blocked; // linked list of contexts (using 'next_blocked') blocked on promises via wait() @@ -182,68 +186,96 @@ namespace fc { */ } - fc::context::ptr ready_pop_front() - { - fc::context::ptr tmp = nullptr; - if( ready_head ) - { - tmp = ready_head; - ready_head = tmp->next; - if( !ready_head ) - ready_tail = nullptr; - tmp->next = nullptr; - } - return tmp; - } - - void add_context_to_ready_list(context* context_to_add) - { - if (!ready_tail) - ready_head = context_to_add; - else - { - context_to_add->context_posted_num = ++next_posted_num; - ready_tail->next = context_to_add; - } - ready_tail = context_to_add; - } - -#if 0 - void ready_push_front(const fc::context::ptr& context_to_push) + fc::context::ptr ready_pop_front() { - BOOST_ASSERT(context_to_push->next == nullptr); - BOOST_ASSERT(context_to_push != current); - - context** iter = &ready_head; - while (*iter && (*iter)->resume_time > context_to_push->resume_time) - iter = &((*iter)->next); - context_to_push->next = *iter; - *iter = context_to_push; - if (!context_to_push->next) - ready_tail = context_to_push; - } - - void ready_push_back(const fc::context::ptr& context_to_push) - { - BOOST_ASSERT(context_to_push->next == nullptr); - BOOST_ASSERT(context_to_push != current); - - if (!ready_tail) - ready_head = context_to_push; - else +#ifdef READY_LIST_IS_HEAP + fc::context* highest_priority_context = ready_heap.front(); + std::pop_heap(ready_heap.begin(), ready_heap.end(), task_priority_less()); + ready_heap.pop_back(); + return highest_priority_context; +#else + fc::context::ptr tmp = nullptr; + if( ready_head ) { - if (context_to_push->resume_time <= ready_tail->resume_time) - context_to_push->resume_time = ready_tail->resume_time + fc::microseconds(1); - ready_tail->next = context_to_push; + tmp = ready_head; + ready_head = tmp->next; + if( !ready_head ) + ready_tail = nullptr; + tmp->next = nullptr; } - ready_tail = context_to_push; - } + return tmp; #endif + } + + void add_context_to_ready_list(context* context_to_add, bool at_end = false) + { + +#ifdef READY_LIST_IS_HEAP + context_to_add->context_posted_num = next_posted_num++; + ready_heap.push_back(context_to_add); + std::push_heap(ready_heap.begin(), ready_heap.end(), task_priority_less()); +#else +# if 1 + if (at_end) + { + if (!ready_tail) + { + ready_head = context_to_add; + context_to_add->context_posted_num = next_posted_num + 100000; + } + else + { + context_to_add->context_posted_num = next_posted_num++; + ready_tail->next = context_to_add; + } + ready_tail = context_to_add; + } + else + { + context_to_add->context_posted_num = next_posted_num++; + context_to_add->next = ready_head; + ready_head = context_to_add; + if (!ready_tail) + ready_tail = context_to_add; + } +# else + if (!ready_tail) + ready_head = context_to_add; + else + { + context_to_add->context_posted_num = next_posted_num++; + ready_tail->next = context_to_add; + } + ready_tail = context_to_add; +# endif +#endif + } + struct task_priority_less { - bool operator()( task_base* a, task_base* b ) + bool operator()(const task_base* a, const task_base* b) const { - return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num); + return a->_prio.value < b->_prio.value ? true : + (a->_prio.value > b->_prio.value ? false : + a->_posted_num > b->_posted_num); + } + bool operator()(const task_base* a, const context* b) const + { + return a->_prio.value < b->prio.value ? true : + (a->_prio.value > b->prio.value ? false : + a->_posted_num > b->context_posted_num); + } + bool operator()(const context* a, const task_base* b) const + { + return a->prio.value < b->_prio.value ? true : + (a->prio.value > b->_prio.value ? false : + a->context_posted_num > b->_posted_num); + } + bool operator()(const context* a, const context* b) const + { + return a->prio.value < b->prio.value ? true : + (a->prio.value > b->prio.value ? false : + a->context_posted_num > b->context_posted_num); } }; @@ -406,29 +438,34 @@ namespace fc { if( !current ) current = new fc::context( &fc::thread::current() ); + priority original_priority = current->prio; + // check to see if any other contexts are ready - if( ready_head ) +#ifdef READY_LIST_IS_HEAP + if (!ready_heap.empty()) +#else + if (ready_head) +#endif { fc::context* next = ready_pop_front(); - if( next == current ) + if (next == current) { // elog( "next == current... something went wrong" ); assert(next != current); return false; } - BOOST_ASSERT( next != current ); + BOOST_ASSERT(next != current); // jump to next context, saving current context fc::context* prev = current; current = next; if (reschedule) - add_context_to_ready_list(prev); + { + current->prio = priority::_internal__priority_for_short_sleeps(); + add_context_to_ready_list(prev, true); + } // slog( "jump to %p from %p", next, prev ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); - thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n", - name.c_str(), - prev->cur_task ? prev->cur_task->get_desc() : "unknown", - next->cur_task ? next->cur_task->get_desc() : "unknown")); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); #elif BOOST_VERSION >= 105300 @@ -464,14 +501,13 @@ namespace fc { current = next; if( reschedule ) - add_context_to_ready_list(prev); + { + current->prio = priority::_internal__priority_for_short_sleeps(); + add_context_to_ready_list(prev, true); + } // slog( "jump to %p from %p", next, prev ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); - thread_debug_msg((thread_debug_log, "EMF: [%s] \"%s\" -> \"%s\"\n", - name.c_str(), - prev->cur_task ? prev->cur_task->get_desc() : "unknown", - next->cur_task ? next->cur_task->get_desc() : "unknown")); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); #elif BOOST_VERSION >= 105300 @@ -484,6 +520,9 @@ namespace fc { //current = prev; } + if (reschedule) + current->prio = original_priority; + if( current->canceled ) { //current->canceled = false; @@ -525,9 +564,6 @@ namespace fc { next->_set_active_context( current ); current->cur_task = next; - - thread_debug_msg((thread_debug_log, "EMF: [%s] starting task \"%s\"\n", name.c_str(), next->get_desc())); - next->run(); current->cur_task = 0; next->_set_active_context(0); @@ -563,10 +599,20 @@ namespace fc { if (!task_pqueue.empty()) { - if (ready_head) +#if 1 + if (task_pqueue.front()->_prio.value != priority::max().value && +#ifdef READY_LIST_IS_HEAP + !ready_heap.empty()) +#else + ready_head) +#endif { // a new task and an existing task are both ready to go +#ifdef READY_LIST_IS_HEAP + if (task_priority_less()(ready_heap.front(), task_pqueue.front())) +#else if (ready_head->context_posted_num < task_pqueue.front()->_posted_num) +#endif { // run the existing task first pt_push_back(current); @@ -574,16 +620,21 @@ namespace fc { continue; } } +#endif // if we made it here, either there's no ready context, or the ready context is - // scheduled after the ready task, so we shoudl run the task first + // scheduled after the ready task, so we should run the task first run_next_task(); continue; } // if I have something else to do other than // process tasks... do it. - if( ready_head ) +#ifdef READY_LIST_IS_HEAP + if (!ready_heap.empty()) +#else + if (ready_head) +#endif { pt_push_back( current ); start_next_fiber(false); @@ -597,14 +648,14 @@ namespace fc { { // lock scope boost::unique_lock lock(task_ready_mutex); - if( has_next_task() ) continue; + if( has_next_task() ) + continue; time_point timeout_time = check_for_timeouts(); - if( done ) return; + if( done ) + return; if( timeout_time == time_point::maximum() ) - { task_ready.wait( lock ); - } else if( timeout_time != time_point::min() ) { // there may be tasks that have been canceled we should filter them out now @@ -647,9 +698,9 @@ namespace fc { } time_point next = time_point::maximum(); - if( sleep_pqueue.size() && next > sleep_pqueue.front()->resume_time ) + if( !sleep_pqueue.empty() && next > sleep_pqueue.front()->resume_time ) next = sleep_pqueue.front()->resume_time; - if( task_sch_queue.size() && next > task_sch_queue.front()->_when ) + if( !task_sch_queue.empty() && next > task_sch_queue.front()->_when ) next = task_sch_queue.front()->_when; time_point now = time_point::now(); @@ -795,6 +846,10 @@ namespace fc { { if ((*sleep_iter)->canceled) { +#ifdef READY_LIST_IS_HEAP + bool already_on_ready_list = std::find(ready_heap.begin(), ready_heap.end(), + *sleep_iter) != ready_heap.end(); +#else bool already_on_ready_list = false; for (fc::context* ready_iter = ready_head; ready_iter; ready_iter = ready_iter->next) if (ready_iter == *sleep_iter) @@ -802,6 +857,7 @@ namespace fc { already_on_ready_list = true; break; } +#endif if (!already_on_ready_list) add_context_to_ready_list(*sleep_iter); sleep_iter = sleep_pqueue.erase(sleep_iter); From b34a222dc5dd33d6fc5130ec11b51389ea0c7304 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Thu, 16 Oct 2014 17:50:58 -0400 Subject: [PATCH 5/8] Remove #ifdefed-out code --- src/thread/thread.cpp | 16 ------- src/thread/thread_d.hpp | 98 +---------------------------------------- 2 files changed, 1 insertion(+), 113 deletions(-) diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 132833e..d93bba4 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -187,33 +187,17 @@ namespace fc { } // mark all ready tasks (should be everyone)... as canceled -#ifdef READY_LIST_IS_HEAP for (fc::context* ready_context : my->ready_heap) ready_context->canceled = true; -#else - cur = my->ready_head; - while( cur ) { - cur->canceled = true; - cur = cur->next; - } -#endif my->done = true; // now that we have poked all fibers... switch to the next one and // let them all quit. -#ifdef READY_LIST_IS_HEAP while (!my->ready_heap.empty()) { my->start_next_fiber(true); my->check_for_timeouts(); } -#else - while (my->ready_head) - { - my->start_next_fiber(true); - my->check_for_timeouts(); - } -#endif my->clear_free_list(); my->cleanup_thread_specific_data(); } diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 8b84ee7..e3635f9 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -8,7 +8,6 @@ #include #include //#include -#define READY_LIST_IS_HEAP namespace fc { struct sleep_priority_less { @@ -26,10 +25,6 @@ namespace fc { done(false), current(0), pt_head(0), -#ifndef READY_LIST_IS_HEAP - ready_head(0), - ready_tail(0), -#endif blocked(0), next_unused_task_storage_slot(0) #ifndef NDEBUG @@ -45,18 +40,9 @@ namespace fc { { delete current; fc::context* temp; -#ifdef READY_LIST_IS_HEAP for (fc::context* ready_context : ready_heap) delete ready_context; ready_heap.clear(); -#else - while (ready_head) - { - temp = ready_head->next; - delete ready_head; - ready_head = temp; - } -#endif while (blocked) { temp = blocked->next; @@ -98,12 +84,7 @@ namespace fc { fc::context* pt_head; // list of contexts that can be reused for new tasks -#ifdef READY_LIST_IS_HEAP - std::vector ready_heap; -#else - fc::context* ready_head; // linked list (using 'next') of contexts that are ready to run - fc::context* ready_tail; -#endif + std::vector ready_heap; // priority heap of contexts that are ready to run fc::context* blocked; // linked list of contexts (using 'next_blocked') blocked on promises via wait() @@ -188,67 +169,18 @@ namespace fc { fc::context::ptr ready_pop_front() { -#ifdef READY_LIST_IS_HEAP fc::context* highest_priority_context = ready_heap.front(); std::pop_heap(ready_heap.begin(), ready_heap.end(), task_priority_less()); ready_heap.pop_back(); return highest_priority_context; -#else - fc::context::ptr tmp = nullptr; - if( ready_head ) - { - tmp = ready_head; - ready_head = tmp->next; - if( !ready_head ) - ready_tail = nullptr; - tmp->next = nullptr; - } - return tmp; -#endif } void add_context_to_ready_list(context* context_to_add, bool at_end = false) { -#ifdef READY_LIST_IS_HEAP context_to_add->context_posted_num = next_posted_num++; ready_heap.push_back(context_to_add); std::push_heap(ready_heap.begin(), ready_heap.end(), task_priority_less()); -#else -# if 1 - if (at_end) - { - if (!ready_tail) - { - ready_head = context_to_add; - context_to_add->context_posted_num = next_posted_num + 100000; - } - else - { - context_to_add->context_posted_num = next_posted_num++; - ready_tail->next = context_to_add; - } - ready_tail = context_to_add; - } - else - { - context_to_add->context_posted_num = next_posted_num++; - context_to_add->next = ready_head; - ready_head = context_to_add; - if (!ready_tail) - ready_tail = context_to_add; - } -# else - if (!ready_tail) - ready_head = context_to_add; - else - { - context_to_add->context_posted_num = next_posted_num++; - ready_tail->next = context_to_add; - } - ready_tail = context_to_add; -# endif -#endif } struct task_priority_less @@ -441,11 +373,7 @@ namespace fc { priority original_priority = current->prio; // check to see if any other contexts are ready -#ifdef READY_LIST_IS_HEAP if (!ready_heap.empty()) -#else - if (ready_head) -#endif { fc::context* next = ready_pop_front(); if (next == current) @@ -599,20 +527,11 @@ namespace fc { if (!task_pqueue.empty()) { -#if 1 if (task_pqueue.front()->_prio.value != priority::max().value && -#ifdef READY_LIST_IS_HEAP !ready_heap.empty()) -#else - ready_head) -#endif { // a new task and an existing task are both ready to go -#ifdef READY_LIST_IS_HEAP if (task_priority_less()(ready_heap.front(), task_pqueue.front())) -#else - if (ready_head->context_posted_num < task_pqueue.front()->_posted_num) -#endif { // run the existing task first pt_push_back(current); @@ -620,7 +539,6 @@ namespace fc { continue; } } -#endif // if we made it here, either there's no ready context, or the ready context is // scheduled after the ready task, so we should run the task first @@ -630,11 +548,7 @@ namespace fc { // if I have something else to do other than // process tasks... do it. -#ifdef READY_LIST_IS_HEAP if (!ready_heap.empty()) -#else - if (ready_head) -#endif { pt_push_back( current ); start_next_fiber(false); @@ -846,18 +760,8 @@ namespace fc { { if ((*sleep_iter)->canceled) { -#ifdef READY_LIST_IS_HEAP bool already_on_ready_list = std::find(ready_heap.begin(), ready_heap.end(), *sleep_iter) != ready_heap.end(); -#else - bool already_on_ready_list = false; - for (fc::context* ready_iter = ready_head; ready_iter; ready_iter = ready_iter->next) - if (ready_iter == *sleep_iter) - { - already_on_ready_list = true; - break; - } -#endif if (!already_on_ready_list) add_context_to_ready_list(*sleep_iter); sleep_iter = sleep_pqueue.erase(sleep_iter); From 227767a425d6ffb9a8c6c022842ac994e7356ae7 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Thu, 16 Oct 2014 19:00:30 -0400 Subject: [PATCH 6/8] Fix type of PRECISION --- src/real128.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/real128.cpp b/src/real128.cpp index c5ae72e..4d1e44a 100644 --- a/src/real128.cpp +++ b/src/real128.cpp @@ -2,8 +2,9 @@ #include #include #include +#include -#define PRECISION (1000000ll * 1000000ll * 1000000ll) +#define PRECISION (UINT64_C(1000000) * UINT64_C(1000000) * UINT64_C(1000000)) namespace fc { From b026e824510ccb4f4caad5a4b4a2d7566f96c8c1 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Fri, 17 Oct 2014 12:04:21 -0400 Subject: [PATCH 7/8] Remove task priority hack that is no longer needed --- src/thread/thread_d.hpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index e3635f9..5b357d2 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -527,11 +527,10 @@ namespace fc { if (!task_pqueue.empty()) { - if (task_pqueue.front()->_prio.value != priority::max().value && - !ready_heap.empty()) + if (!ready_heap.empty()) { // a new task and an existing task are both ready to go - if (task_priority_less()(ready_heap.front(), task_pqueue.front())) + if (task_priority_less()(task_pqueue.front(), ready_heap.front())) { // run the existing task first pt_push_back(current); From 7ddf88245906bc8f08a48ebe992ace63922bf439 Mon Sep 17 00:00:00 2001 From: Vikram Rajkumar Date: Fri, 17 Oct 2014 13:23:51 -0400 Subject: [PATCH 8/8] Fix linux build --- src/network/ntp.cpp | 4 ++-- src/real128.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index 7a08d3a..bd7d43a 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -51,7 +51,7 @@ namespace fc { uint64_t ntp_timestamp_host = bswap_64(ntp_timestamp_net_order); uint32_t fractional_seconds = ntp_timestamp_host & 0xffffffff; - uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (UINT64_C(1) << 31)) >> 32); + uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (uint64_t(1) << 31)) >> 32); uint32_t seconds_since_1900 = ntp_timestamp_host >> 32; uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; return fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds); @@ -63,7 +63,7 @@ namespace fc uint32_t seconds_since_epoch = (uint32_t)(microseconds_since_epoch / 1000000); uint32_t seconds_since_1900 = seconds_since_epoch + 2208988800; uint32_t microseconds = microseconds_since_epoch % 1000000; - uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (UINT64_C(1) << 31)) / 1000000); + uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (uint64_t(1) << 31)) / 1000000); uint64_t ntp_timestamp_net_order = ((uint64_t)seconds_since_1900 << 32) + fractional_seconds; return bswap_64(ntp_timestamp_net_order); } diff --git a/src/real128.cpp b/src/real128.cpp index 4d1e44a..3f5b431 100644 --- a/src/real128.cpp +++ b/src/real128.cpp @@ -4,7 +4,7 @@ #include #include -#define PRECISION (UINT64_C(1000000) * UINT64_C(1000000) * UINT64_C(1000000)) +#define PRECISION (uint64_t(1000000) * uint64_t(1000000) * uint64_t(1000000)) namespace fc {