diff --git a/include/fc/thread/priority.hpp b/include/fc/thread/priority.hpp index 90749ab..01fd61e 100644 --- a/include/fc/thread/priority.hpp +++ b/include/fc/thread/priority.hpp @@ -15,6 +15,7 @@ namespace fc { } static priority max() { return priority(10000); } static priority min() { return priority(-10000); } + static priority _internal__priority_for_short_sleeps() { return priority(-100000); } int value; }; } diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index 7a08d3a..bd7d43a 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -51,7 +51,7 @@ namespace fc { uint64_t ntp_timestamp_host = bswap_64(ntp_timestamp_net_order); uint32_t fractional_seconds = ntp_timestamp_host & 0xffffffff; - uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (UINT64_C(1) << 31)) >> 32); + uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (uint64_t(1) << 31)) >> 32); uint32_t seconds_since_1900 = ntp_timestamp_host >> 32; uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; return fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds); @@ -63,7 +63,7 @@ namespace fc uint32_t seconds_since_epoch = (uint32_t)(microseconds_since_epoch / 1000000); uint32_t seconds_since_1900 = seconds_since_epoch + 2208988800; uint32_t microseconds = microseconds_since_epoch % 1000000; - uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (UINT64_C(1) << 31)) / 1000000); + uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (uint64_t(1) << 31)) / 1000000); uint64_t ntp_timestamp_net_order = ((uint64_t)seconds_since_1900 << 32) + fractional_seconds; return bswap_64(ntp_timestamp_net_order); } diff --git a/src/real128.cpp b/src/real128.cpp index c5ae72e..3f5b431 100644 --- a/src/real128.cpp +++ b/src/real128.cpp @@ -2,8 +2,9 @@ #include #include #include +#include -#define PRECISION (1000000ll * 1000000ll * 1000000ll) +#define PRECISION (uint64_t(1000000) * uint64_t(1000000) * uint64_t(1000000)) namespace fc { diff --git a/src/thread/context.hpp b/src/thread/context.hpp index 9801499..f7d15e9 100644 --- a/src/thread/context.hpp +++ b/src/thread/context.hpp @@ -60,7 +60,8 @@ namespace fc { cancellation_reason(nullptr), #endif complete(false), - cur_task(0) + cur_task(0), + context_posted_num(0) { #if BOOST_VERSION >= 105600 size_t stack_size = stack_allocator::traits_type::default_size() * 4; @@ -99,7 +100,8 @@ namespace fc { cancellation_reason(nullptr), #endif complete(false), - cur_task(0) + cur_task(0), + context_posted_num(0) {} ~context() { @@ -229,6 +231,7 @@ namespace fc { #endif bool complete; task_base* cur_task; + uint64_t context_posted_num; // serial number set each tiem the context is added to the ready list }; } // naemspace fc diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index ea12d24..d93bba4 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -173,7 +173,7 @@ namespace fc { // move all sleep tasks to ready for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { - my->ready_push_front( my->sleep_pqueue[i] ); + my->add_context_to_ready_list( my->sleep_pqueue[i] ); } my->sleep_pqueue.clear(); @@ -182,21 +182,19 @@ namespace fc { while( cur ) { fc::context* n = cur->next; cur->next = 0; - my->ready_push_front( cur ); + my->add_context_to_ready_list( cur ); cur = n; } // mark all ready tasks (should be everyone)... as canceled - cur = my->ready_head; - while( cur ) { - cur->canceled = true; - cur = cur->next; - } + for (fc::context* ready_context : my->ready_heap) + ready_context->canceled = true; my->done = true; // now that we have poked all fibers... switch to the next one and // let them all quit. - while( my->ready_head ) { + while (!my->ready_heap.empty()) + { my->start_next_fiber(true); my->check_for_timeouts(); } @@ -204,90 +202,98 @@ namespace fc { my->cleanup_thread_specific_data(); } - void thread::exec() { - if( !my->current ) my->current = new fc::context(&fc::thread::current()); - try { + void thread::exec() + { + if( !my->current ) + my->current = new fc::context(&fc::thread::current()); + + try + { my->process_tasks(); } catch( canceled_exception& e ) { - wlog( "thread canceled: ${e}", ("e", e.to_detail_string()) ); + wlog( "thread canceled: ${e}", ("e", e.to_detail_string()) ); } delete my->current; my->current = 0; } - bool thread::is_running()const { + bool thread::is_running()const + { return !my->done; } - priority thread::current_priority()const { + priority thread::current_priority()const + { BOOST_ASSERT(my); - if( my->current ) return my->current->prio; + if( my->current ) + return my->current->prio; return priority(); } - void thread::yield(bool reschedule ) { + void thread::yield(bool reschedule) + { my->check_fiber_exceptions(); my->start_next_fiber(reschedule); my->check_fiber_exceptions(); } - void thread::sleep_until( const time_point& tp ) { - if( tp <= (time_point::now()+fc::microseconds(10000)) ) - { - this->yield(true); - } - my->yield_until( tp, false ); + void thread::sleep_until( const time_point& tp ) + { + if( tp <= (time_point::now()+fc::microseconds(10000)) ) + yield(true); + my->yield_until( tp, false ); } int thread::wait_any_until( std::vector&& p, const time_point& timeout) { - for( size_t i = 0; i < p.size(); ++i ) { - if( p[i]->ready() ) return i; - } + for( size_t i = 0; i < p.size(); ++i ) + if( p[i]->ready() ) + return i; - if( timeout < time_point::now() ) { - fc::stringstream ss; - for( auto i = p.begin(); i != p.end(); ++i ) { - ss << (*i)->get_desc() <<", "; - } - FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) ); + if( timeout < time_point::now() ) + { + fc::stringstream ss; + for( auto i = p.begin(); i != p.end(); ++i ) + ss << (*i)->get_desc() << ", "; + + FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) ); } - if( !my->current ) { + if( !my->current ) my->current = new fc::context(&fc::thread::current()); - } - for( uint32_t i = 0; i < p.size(); ++i ) { - my->current->add_blocking_promise(p[i].get(),false); - }; + for( uint32_t i = 0; i < p.size(); ++i ) + my->current->add_blocking_promise(p[i].get(),false); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { + if( timeout != time_point::maximum() ) + { my->current->resume_time = timeout; my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), sleep_priority_less() ); } + my->add_to_blocked( my->current ); my->start_next_fiber(); - for( auto i = p.begin(); i != p.end(); ++i ) { - my->current->remove_blocking_promise(i->get()); - } + for( auto i = p.begin(); i != p.end(); ++i ) + my->current->remove_blocking_promise(i->get()); my->check_fiber_exceptions(); - for( uint32_t i = 0; i < p.size(); ++i ) { - if( p[i]->ready() ) return i; - } + for( uint32_t i = 0; i < p.size(); ++i ) + if( p[i]->ready() ) + return i; + //BOOST_THROW_EXCEPTION( wait_any_error() ); return -1; } void thread::async_task( task_base* t, const priority& p ) { - async_task( t, p, time_point::min() ); + async_task( t, p, time_point::min() ); } void thread::poke() { @@ -323,35 +329,43 @@ namespace fc { thread::current().sleep_until(tp); } - void exec() { + void exec() + { return thread::current().exec(); } - int wait_any( std::vector&& v, const microseconds& timeout_us ) { + int wait_any( std::vector&& v, const microseconds& timeout_us ) + { return thread::current().wait_any_until( fc::move(v), time_point::now() + timeout_us ); } - int wait_any_until( std::vector&& v, const time_point& tp ) { + + int wait_any_until( std::vector&& v, const time_point& tp ) + { return thread::current().wait_any_until( fc::move(v), tp ); } - void thread::wait_until( promise_base::ptr&& p, const time_point& timeout ) { - if( p->ready() ) return; + + void thread::wait_until( promise_base::ptr&& p, const time_point& timeout ) + { + if( p->ready() ) + return; + if( timeout < time_point::now() ) - FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) ); + FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) ); - if( !my->current ) { + if( !my->current ) my->current = new fc::context(&fc::thread::current()); - } //slog( " %1% blocking on %2%", my->current, p.get() ); - my->current->add_blocking_promise(p.get(),true); + my->current->add_blocking_promise(p.get(), true); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { + if( timeout != time_point::maximum() ) + { my->current->resume_time = timeout; my->sleep_pqueue.push_back(my->current); std::push_heap( my->sleep_pqueue.begin(), my->sleep_pqueue.end(), - sleep_priority_less() ); + sleep_priority_less() ); } // elog( "blocking %1%", my->current ); @@ -368,10 +382,12 @@ namespace fc { my->check_fiber_exceptions(); } - void thread::notify( const promise_base::ptr& p ) { + void thread::notify( const promise_base::ptr& p ) + { //slog( "this %p my %p", this, my ); BOOST_ASSERT(p->ready()); - if( !is_current() ) { + if( !is_current() ) + { this->async( [=](){ notify(p); }, "notify", priority::max() ); return; } @@ -383,14 +399,18 @@ namespace fc { fc::context* cur_blocked = my->blocked; fc::context* prev_blocked = 0; - while( cur_blocked ) { + while( cur_blocked ) + { // if the blocked context is waiting on this promise - if( cur_blocked->try_unblock( p.get() ) ) { + if( cur_blocked->try_unblock( p.get() ) ) + { // remove it from the blocked list. // remove this context from the sleep queue... - for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { - if( my->sleep_pqueue[i] == cur_blocked ) { + for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) + { + if( my->sleep_pqueue[i] == cur_blocked ) + { my->sleep_pqueue[i]->blocking_prom.clear(); my->sleep_pqueue[i] = my->sleep_pqueue.back(); my->sleep_pqueue.pop_back(); @@ -399,22 +419,29 @@ namespace fc { } } auto cur = cur_blocked; - if( prev_blocked ) { + if( prev_blocked ) + { prev_blocked->next_blocked = cur_blocked->next_blocked; cur_blocked = prev_blocked->next_blocked; - } else { + } + else + { my->blocked = cur_blocked->next_blocked; cur_blocked = my->blocked; } cur->next_blocked = 0; - my->ready_push_front( cur ); - } else { // goto the next blocked task + my->add_context_to_ready_list( cur ); + } + else + { // goto the next blocked task prev_blocked = cur_blocked; cur_blocked = cur_blocked->next_blocked; } } } - bool thread::is_current()const { + + bool thread::is_current()const + { return this == ¤t(); } diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 624d580..5b357d2 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -21,12 +21,10 @@ namespace fc { thread_d(fc::thread& s) :self(s), boost_thread(0), task_in_queue(0), - next_task_posted_num(1), + next_posted_num(1), done(false), current(0), pt_head(0), - ready_head(0), - ready_tail(0), blocked(0), next_unused_task_storage_slot(0) #ifndef NDEBUG @@ -37,15 +35,14 @@ namespace fc { name = fc::string("th_") + char('a'+cnt++); // printf("thread=%p\n",this); } - ~thread_d(){ + + ~thread_d() + { delete current; fc::context* temp; - while (ready_head) - { - temp = ready_head->next; - delete ready_head; - ready_head = temp; - } + for (fc::context* ready_context : ready_heap) + delete ready_context; + ready_heap.clear(); while (blocked) { temp = blocked->next; @@ -66,7 +63,8 @@ namespace fc { boost_thread->detach(); delete boost_thread; } - } + } + fc::thread& self; boost::thread* boost_thread; stack_allocator stack_alloc; @@ -75,7 +73,7 @@ namespace fc { boost::atomic task_in_queue; std::vector task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time - uint64_t next_task_posted_num; // each task gets assigned a number in the order it is started, tracked here + uint64_t next_posted_num; // each task or context gets assigned a number in the order it is ready to execute, tracked here std::vector task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run std::vector sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume std::vector free_list; // list of unused contexts that are ready for deletion @@ -86,8 +84,7 @@ namespace fc { fc::context* pt_head; // list of contexts that can be reused for new tasks - fc::context* ready_head; // linked list (using 'next') of contexts that are ready to run - fc::context* ready_tail; + std::vector ready_heap; // priority heap of contexts that are ready to run fc::context* blocked; // linked list of contexts (using 'next_blocked') blocked on promises via wait() @@ -149,12 +146,14 @@ namespace fc { } #endif // insert at from of blocked linked list - inline void add_to_blocked( fc::context* c ) { + inline void add_to_blocked( fc::context* c ) + { c->next_blocked = blocked; blocked = c; } - void pt_push_back(fc::context* c) { + void pt_push_back(fc::context* c) + { c->next = pt_head; pt_head = c; /* @@ -167,49 +166,58 @@ namespace fc { wlog( "idle context...%2% %1%", c, i ); */ } - fc::context::ptr ready_pop_front() { - fc::context::ptr tmp = nullptr; - if( ready_head ) { - tmp = ready_head; - ready_head = tmp->next; - if( !ready_head ) - ready_tail = nullptr; - tmp->next = nullptr; - } - return tmp; + + fc::context::ptr ready_pop_front() + { + fc::context* highest_priority_context = ready_heap.front(); + std::pop_heap(ready_heap.begin(), ready_heap.end(), task_priority_less()); + ready_heap.pop_back(); + return highest_priority_context; + } + + void add_context_to_ready_list(context* context_to_add, bool at_end = false) + { + + context_to_add->context_posted_num = next_posted_num++; + ready_heap.push_back(context_to_add); + std::push_heap(ready_heap.begin(), ready_heap.end(), task_priority_less()); } - void ready_push_front( const fc::context::ptr& c ) { - BOOST_ASSERT( c->next == nullptr ); - BOOST_ASSERT( c != current ); - //if( c == current ) wlog( "pushing current to ready??" ); - c->next = ready_head; - ready_head = c; - if( !ready_tail ) - ready_tail = c; - } - void ready_push_back( const fc::context::ptr& c ) { - BOOST_ASSERT( c->next == nullptr ); - BOOST_ASSERT( c != current ); - //if( c == current ) wlog( "pushing current to ready??" ); - c->next = 0; - if( ready_tail ) { - ready_tail->next = c; - } else { - assert( !ready_head ); - ready_head = c; - } - ready_tail = c; - } - struct task_priority_less { - bool operator()( task_base* a, task_base* b ) { - return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num ); - } - }; - struct task_when_less { - bool operator()( task_base* a, task_base* b ) { - return a->_when > b->_when; - } - }; + + struct task_priority_less + { + bool operator()(const task_base* a, const task_base* b) const + { + return a->_prio.value < b->_prio.value ? true : + (a->_prio.value > b->_prio.value ? false : + a->_posted_num > b->_posted_num); + } + bool operator()(const task_base* a, const context* b) const + { + return a->_prio.value < b->prio.value ? true : + (a->_prio.value > b->prio.value ? false : + a->_posted_num > b->context_posted_num); + } + bool operator()(const context* a, const task_base* b) const + { + return a->prio.value < b->_prio.value ? true : + (a->prio.value > b->_prio.value ? false : + a->context_posted_num > b->_posted_num); + } + bool operator()(const context* a, const context* b) const + { + return a->prio.value < b->prio.value ? true : + (a->prio.value > b->prio.value ? false : + a->context_posted_num > b->context_posted_num); + } + }; + + struct task_when_less + { + bool operator()( task_base* a, task_base* b ) + { + return a->_when > b->_when; + } + }; void enqueue( task_base* t ) { @@ -229,7 +237,7 @@ namespace fc { } cur = t; - next_task_posted_num += num_ready_tasks; + next_posted_num += num_ready_tasks; unsigned tasks_posted = 0; while (cur) { @@ -241,7 +249,7 @@ namespace fc { } else { - cur->_posted_num = next_task_posted_num - (++tasks_posted); + cur->_posted_num = next_posted_num - (++tasks_posted); task_pqueue.push_back(cur); std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less()); @@ -251,31 +259,46 @@ namespace fc { } } - task_base* dequeue() { + void move_newly_scheduled_tasks_to_task_pqueue() + { + BOOST_ASSERT(this == thread::current().my); + + // first, if there are any new tasks on 'task_in_queue', which is tasks that + // have been just been async or scheduled, but we haven't processed them. + // move them into the task_sch_queue or task_pqueue, as appropriate + + //DLN: changed from memory_order_consume for boost 1.55. + //This appears to be safest replacement for now, maybe + //can be changed to relaxed later, but needs analysis. + task_base* pending_list = task_in_queue.exchange(0, boost::memory_order_seq_cst); + if (pending_list) + enqueue(pending_list); + + // second, walk through task_sch_queue and move any scheduled tasks that are now + // able to run (because their scheduled time has arrived) to task_pqueue + + while (!task_sch_queue.empty() && + task_sch_queue.front()->_when <= time_point::now()) + { + task_base* ready_task = task_sch_queue.front(); + std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less()); + task_sch_queue.pop_back(); + + ready_task->_posted_num = next_posted_num++; + task_pqueue.push_back(ready_task); + std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less()); + } + } + + task_base* dequeue() + { // get a new task BOOST_ASSERT( this == thread::current().my ); - - task_base* pending = 0; - //DLN: changed from memory_order_consume for boost 1.55. - //This appears to be safest replacement for now, maybe - //can be changed to relaxed later, but needs analysis. - pending = task_in_queue.exchange(0,boost::memory_order_seq_cst); - if( pending ) { enqueue( pending ); } - task_base* p(0); - if( task_sch_queue.size() ) { - if( task_sch_queue.front()->_when <= time_point::now() ) { - p = task_sch_queue.front(); - std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less() ); - task_sch_queue.pop_back(); - return p; - } - } - if( task_pqueue.size() ) { - p = task_pqueue.front(); - std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); - task_pqueue.pop_back(); - } + assert(!task_pqueue.empty()); + task_base* p = task_pqueue.front(); + std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); + task_pqueue.pop_back(); return p; } @@ -307,7 +330,8 @@ namespace fc { * This should be before or after a context switch to * detect quit/cancel operations and throw an exception. */ - void check_fiber_exceptions() { + void check_fiber_exceptions() + { if( current && current->canceled ) { #ifdef NDEBUG FC_THROW_EXCEPTION( canceled_exception, "" ); @@ -317,7 +341,7 @@ namespace fc { } else if( done ) { ilog( "throwing canceled exception" ); FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: thread quitting" ); - // BOOST_THROW_EXCEPTION( thread_quit() ); + // BOOST_THROW_EXCEPTION( thread_quit() ); } } @@ -326,7 +350,8 @@ namespace fc { * If none are available then create a new context and * have it wait for something to do. */ - bool start_next_fiber( bool reschedule = false ) { + bool start_next_fiber( bool reschedule = false ) + { /* If this assert fires, it means you are executing an operation that is causing * the current task to yield, but there is a ASSERT_TASK_NOT_PREEMPTED() in effect * (somewhere up the stack) */ @@ -342,54 +367,75 @@ namespace fc { assert(std::current_exception() == std::exception_ptr()); check_for_timeouts(); - if( !current ) current = new fc::context( &fc::thread::current() ); + if( !current ) + current = new fc::context( &fc::thread::current() ); + + priority original_priority = current->prio; // check to see if any other contexts are ready - if( ready_head ) { + if (!ready_heap.empty()) + { fc::context* next = ready_pop_front(); - if( next == current ) { + if (next == current) + { // elog( "next == current... something went wrong" ); assert(next != current); - return false; + return false; } - BOOST_ASSERT( next != current ); + BOOST_ASSERT(next != current); // jump to next context, saving current context fc::context* prev = current; current = next; - if( reschedule ) ready_push_back(prev); - // slog( "jump to %p from %p", next, prev ); - // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + if (reschedule) + { + current->prio = priority::_internal__priority_for_short_sleeps(); + add_context_to_ready_list(prev, true); + } + // slog( "jump to %p from %p", next, prev ); + // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); #if BOOST_VERSION >= 105600 - bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); + bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); #elif BOOST_VERSION >= 105300 - bc::jump_fcontext( prev->my_context, next->my_context, 0 ); + bc::jump_fcontext( prev->my_context, next->my_context, 0 ); #else - bc::jump_fcontext( &prev->my_context, &next->my_context, 0 ); + bc::jump_fcontext( &prev->my_context, &next->my_context, 0 ); #endif - BOOST_ASSERT( current ); - BOOST_ASSERT( current == prev ); + BOOST_ASSERT( current ); + BOOST_ASSERT( current == prev ); //current = prev; - } else { // all contexts are blocked, create a new context - // that will process posted tasks... + } + else + { + // all contexts are blocked, create a new context + // that will process posted tasks... fc::context* prev = current; fc::context* next = nullptr; - if( pt_head ) { // grab cached context + if( pt_head ) + { + // grab cached context next = pt_head; pt_head = pt_head->next; next->next = 0; next->reinitialize(); - } else { // create new context. + } + else + { + // create new context. next = new fc::context( &thread_d::start_process_tasks, stack_alloc, - &fc::thread::current() ); + &fc::thread::current() ); } current = next; - if( reschedule ) ready_push_back(prev); + if( reschedule ) + { + current->prio = priority::_internal__priority_for_short_sleeps(); + add_context_to_ready_list(prev, true); + } - // slog( "jump to %p from %p", next, prev ); - // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + // slog( "jump to %p from %p", next, prev ); + // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); #elif BOOST_VERSION >= 105300 @@ -402,7 +448,11 @@ namespace fc { //current = prev; } - if( current->canceled ) { + if (reschedule) + current->prio = original_priority; + + if( current->canceled ) + { //current->canceled = false; #ifdef NDEBUG FC_THROW_EXCEPTION( canceled_exception, "" ); @@ -414,76 +464,113 @@ namespace fc { return true; } - static void start_process_tasks( intptr_t my ) { + static void start_process_tasks( intptr_t my ) + { thread_d* self = (thread_d*)my; - try { + try + { self->process_tasks(); - } catch ( canceled_exception& ) { - // allowed exception... - } catch ( ... ) { - elog( "fiber ${name} exited with uncaught exception: ${e}", ("e",fc::except_str())("name", self->name) ); - // assert( !"fiber exited with uncaught exception" ); - //TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<< - // boost::current_exception_diagnostic_information() <name) ); + // assert( !"fiber exited with uncaught exception" ); + //TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<< + // boost::current_exception_diagnostic_information() <free_list.push_back(self->current); self->start_next_fiber( false ); } - bool run_next_task() { - check_for_timeouts(); - task_base* next = dequeue(); + void run_next_task() + { + task_base* next = dequeue(); - if( next ) { - next->_set_active_context( current ); - current->cur_task = next; - next->run(); - current->cur_task = 0; - next->_set_active_context(0); - next->release(); - current->reinitialize(); - return true; - } - return false; + next->_set_active_context( current ); + current->cur_task = next; + next->run(); + current->cur_task = 0; + next->_set_active_context(0); + next->release(); + current->reinitialize(); } - bool has_next_task() { + + bool has_next_task() + { if( task_pqueue.size() || (task_sch_queue.size() && task_sch_queue.front()->_when <= time_point::now()) || task_in_queue.load( boost::memory_order_relaxed ) ) - return true; + return true; return false; } - void clear_free_list() { - for( uint32_t i = 0; i < free_list.size(); ++i ) { + + void clear_free_list() + { + for( uint32_t i = 0; i < free_list.size(); ++i ) delete free_list[i]; - } free_list.clear(); } - void process_tasks() { - while( !done || blocked ) { - if( run_next_task() ) continue; + + void process_tasks() + { + while( !done || blocked ) + { + // move all new tasks to the task_pqueue + move_newly_scheduled_tasks_to_task_pqueue(); + + // move all now-ready sleeping tasks to the ready list + check_for_timeouts(); + + if (!task_pqueue.empty()) + { + if (!ready_heap.empty()) + { + // a new task and an existing task are both ready to go + if (task_priority_less()(task_pqueue.front(), ready_heap.front())) + { + // run the existing task first + pt_push_back(current); + start_next_fiber(false); + continue; + } + } + + // if we made it here, either there's no ready context, or the ready context is + // scheduled after the ready task, so we should run the task first + run_next_task(); + continue; + } // if I have something else to do other than // process tasks... do it. - if( ready_head ) { + if (!ready_heap.empty()) + { pt_push_back( current ); start_next_fiber(false); continue; } - if( process_canceled_tasks() ) continue; + if( process_canceled_tasks() ) + continue; clear_free_list(); { // lock scope boost::unique_lock lock(task_ready_mutex); - if( has_next_task() ) continue; + if( has_next_task() ) + continue; time_point timeout_time = check_for_timeouts(); - if( done ) return; - if( timeout_time == time_point::maximum() ) { + if( done ) + return; + if( timeout_time == time_point::maximum() ) task_ready.wait( lock ); - } else if( timeout_time != time_point::min() ) { + else if( timeout_time != time_point::min() ) + { // there may be tasks that have been canceled we should filter them out now // rather than waiting... @@ -515,64 +602,68 @@ namespace fc { * Retunn system_clock::time_point::max() if there are no scheduled tasks * Return the time the next task needs to be run if there is anything scheduled. */ - time_point check_for_timeouts() { - if( !sleep_pqueue.size() && !task_sch_queue.size() ) { - //ilog( "no timeouts ready" ); - return time_point::maximum(); + time_point check_for_timeouts() + { + if( !sleep_pqueue.size() && !task_sch_queue.size() ) + { + // ilog( "no timeouts ready" ); + return time_point::maximum(); } time_point next = time_point::maximum(); - if( sleep_pqueue.size() && next > sleep_pqueue.front()->resume_time ) + if( !sleep_pqueue.empty() && next > sleep_pqueue.front()->resume_time ) next = sleep_pqueue.front()->resume_time; - if( task_sch_queue.size() && next > task_sch_queue.front()->_when ) + if( !task_sch_queue.empty() && next > task_sch_queue.front()->_when ) next = task_sch_queue.front()->_when; time_point now = time_point::now(); - if( now < next ) { return next; } + if( now < next ) + return next; // move all expired sleeping tasks to the ready queue while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now ) { - fc::context::ptr c = sleep_pqueue.front(); - std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); - //ilog( "sleep pop back..." ); - sleep_pqueue.pop_back(); + fc::context::ptr c = sleep_pqueue.front(); + std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); + // ilog( "sleep pop back..." ); + sleep_pqueue.pop_back(); - if( c->blocking_prom.size() ) - { - // ilog( "timeotu blocking prom" ); - c->timeout_blocking_promises(); - } - else - { - //ilog( "..." ); - //ilog( "ready_push_front" ); - if (c != current) - ready_push_front( c ); - } + if( c->blocking_prom.size() ) + { + // ilog( "timeout blocking prom" ); + c->timeout_blocking_promises(); + } + else + { + // ilog( "..." ); + // ilog( "ready_push_front" ); + if (c != current) + add_context_to_ready_list(c); + } } return time_point::min(); } - void unblock( fc::context* c ) { - if( fc::thread::current().my != this ) { - self.async( [=](){ unblock(c); }, "thread_d::unblock" ); - return; - } - if( c != current ) ready_push_front(c); - } + void unblock( fc::context* c ) + { + if( fc::thread::current().my != this ) + { + self.async( [=](){ unblock(c); }, "thread_d::unblock" ); + return; + } + + if (c != current) + add_context_to_ready_list(c); + } void yield_until( const time_point& tp, bool reschedule ) { check_fiber_exceptions(); if( tp <= (time_point::now()+fc::microseconds(10000)) ) - { return; - } - if( !current ) { + if( !current ) current = new fc::context(&fc::thread::current()); - } current->resume_time = tp; current->clear_blocking_promises(); @@ -584,8 +675,10 @@ namespace fc { start_next_fiber(reschedule); // clear current context from sleep queue... - for( uint32_t i = 0; i < sleep_pqueue.size(); ++i ) { - if( sleep_pqueue[i] == current ) { + for( uint32_t i = 0; i < sleep_pqueue.size(); ++i ) + { + if( sleep_pqueue[i] == current ) + { sleep_pqueue[i] = sleep_pqueue.back(); sleep_pqueue.pop_back(); std::make_heap( sleep_pqueue.begin(), @@ -599,35 +692,37 @@ namespace fc { } void wait( const promise_base::ptr& p, const time_point& timeout ) { - if( p->ready() ) return; + if( p->ready() ) + return; + if( timeout < time_point::now() ) - FC_THROW_EXCEPTION( timeout_exception, "" ); + FC_THROW_EXCEPTION( timeout_exception, "" ); - if( !current ) { + if( !current ) current = new fc::context(&fc::thread::current()); - } - //slog( " %1% blocking on %2%", current, p.get() ); + // slog( " %1% blocking on %2%", current, p.get() ); current->add_blocking_promise(p.get(),true); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { - current->resume_time = timeout; - sleep_pqueue.push_back(current); - std::push_heap( sleep_pqueue.begin(), - sleep_pqueue.end(), - sleep_priority_less() ); + if( timeout != time_point::maximum() ) + { + current->resume_time = timeout; + sleep_pqueue.push_back(current); + std::push_heap( sleep_pqueue.begin(), + sleep_pqueue.end(), + sleep_priority_less() ); } - // elog( "blocking %1%", current ); + // elog( "blocking %1%", current ); add_to_blocked( current ); - // debug("swtiching fibers..." ); + // debug("swtiching fibers..." ); start_next_fiber(); - // slog( "resuming %1%", current ); + // slog( "resuming %1%", current ); - //slog( " %1% unblocking blocking on %2%", current, p.get() ); + // slog( " %1% unblocking blocking on %2%", current, p.get() ); current->remove_blocking_promise(p.get()); check_fiber_exceptions(); @@ -652,7 +747,7 @@ namespace fc { { fc::context* next_blocked = (*iter)->next_blocked; (*iter)->next_blocked = nullptr; - ready_push_front(*iter); + add_context_to_ready_list(*iter); *iter = next_blocked; continue; } @@ -664,15 +759,10 @@ namespace fc { { if ((*sleep_iter)->canceled) { - bool already_on_ready_list = false; - for (fc::context* ready_iter = ready_head; ready_iter; ready_iter = ready_iter->next) - if (ready_iter == *sleep_iter) - { - already_on_ready_list = true; - break; - } + bool already_on_ready_list = std::find(ready_heap.begin(), ready_heap.end(), + *sleep_iter) != ready_heap.end(); if (!already_on_ready_list) - ready_push_front(*sleep_iter); + add_context_to_ready_list(*sleep_iter); sleep_iter = sleep_pqueue.erase(sleep_iter); task_removed_from_sleep_pqueue = true; } diff --git a/src/variant.cpp b/src/variant.cpp index df5d716..148ea39 100644 --- a/src/variant.cpp +++ b/src/variant.cpp @@ -9,6 +9,7 @@ #include #include #include +#include namespace fc {