diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 624d580..eda1c74 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -37,7 +37,9 @@ namespace fc { name = fc::string("th_") + char('a'+cnt++); // printf("thread=%p\n",this); } - ~thread_d(){ + + ~thread_d() + { delete current; fc::context* temp; while (ready_head) @@ -66,7 +68,8 @@ namespace fc { boost_thread->detach(); delete boost_thread; } - } + } + fc::thread& self; boost::thread* boost_thread; stack_allocator stack_alloc; @@ -149,12 +152,14 @@ namespace fc { } #endif // insert at from of blocked linked list - inline void add_to_blocked( fc::context* c ) { + inline void add_to_blocked( fc::context* c ) + { c->next_blocked = blocked; blocked = c; } - void pt_push_back(fc::context* c) { + void pt_push_back(fc::context* c) + { c->next = pt_head; pt_head = c; /* @@ -167,9 +172,12 @@ namespace fc { wlog( "idle context...%2% %1%", c, i ); */ } - fc::context::ptr ready_pop_front() { + + fc::context::ptr ready_pop_front() + { fc::context::ptr tmp = nullptr; - if( ready_head ) { + if( ready_head ) + { tmp = ready_head; ready_head = tmp->next; if( !ready_head ) @@ -178,35 +186,48 @@ namespace fc { } return tmp; } - void ready_push_front( const fc::context::ptr& c ) { + + void ready_push_front( const fc::context::ptr& c ) + { BOOST_ASSERT( c->next == nullptr ); BOOST_ASSERT( c != current ); - //if( c == current ) wlog( "pushing current to ready??" ); + // if( c == current ) + // wlog( "pushing current to ready??" ); c->next = ready_head; ready_head = c; if( !ready_tail ) ready_tail = c; } - void ready_push_back( const fc::context::ptr& c ) { + + void ready_push_back( const fc::context::ptr& c ) + { BOOST_ASSERT( c->next == nullptr ); BOOST_ASSERT( c != current ); - //if( c == current ) wlog( "pushing current to ready??" ); + // if( c == current ) + // wlog( "pushing current to ready??" ); c->next = 0; - if( ready_tail ) { + if( ready_tail ) ready_tail->next = c; - } else { - assert( !ready_head ); - ready_head = c; + else + { + assert( !ready_head ); + ready_head = c; } ready_tail = c; } - struct task_priority_less { - bool operator()( task_base* a, task_base* b ) { + + struct task_priority_less + { + bool operator()( task_base* a, task_base* b ) + { return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num ); } }; - struct task_when_less { - bool operator()( task_base* a, task_base* b ) { + + struct task_when_less + { + bool operator()( task_base* a, task_base* b ) + { return a->_when > b->_when; } }; @@ -251,7 +272,8 @@ namespace fc { } } - task_base* dequeue() { + task_base* dequeue() + { // get a new task BOOST_ASSERT( this == thread::current().my ); @@ -260,18 +282,22 @@ namespace fc { //This appears to be safest replacement for now, maybe //can be changed to relaxed later, but needs analysis. pending = task_in_queue.exchange(0,boost::memory_order_seq_cst); - if( pending ) { enqueue( pending ); } + if( pending ) + enqueue( pending ); - task_base* p(0); - if( task_sch_queue.size() ) { - if( task_sch_queue.front()->_when <= time_point::now() ) { + task_base* p = 0; + if( !task_sch_queue.empty() ) + { + if( task_sch_queue.front()->_when <= time_point::now() ) + { p = task_sch_queue.front(); std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less() ); task_sch_queue.pop_back(); return p; } } - if( task_pqueue.size() ) { + if( !task_pqueue.empty() ) + { p = task_pqueue.front(); std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); task_pqueue.pop_back(); @@ -307,7 +333,8 @@ namespace fc { * This should be before or after a context switch to * detect quit/cancel operations and throw an exception. */ - void check_fiber_exceptions() { + void check_fiber_exceptions() + { if( current && current->canceled ) { #ifdef NDEBUG FC_THROW_EXCEPTION( canceled_exception, "" ); @@ -317,7 +344,7 @@ namespace fc { } else if( done ) { ilog( "throwing canceled exception" ); FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: thread quitting" ); - // BOOST_THROW_EXCEPTION( thread_quit() ); + // BOOST_THROW_EXCEPTION( thread_quit() ); } } @@ -326,7 +353,8 @@ namespace fc { * If none are available then create a new context and * have it wait for something to do. */ - bool start_next_fiber( bool reschedule = false ) { + bool start_next_fiber( bool reschedule = false ) + { /* If this assert fires, it means you are executing an operation that is causing * the current task to yield, but there is a ASSERT_TASK_NOT_PREEMPTED() in effect * (somewhere up the stack) */ @@ -342,54 +370,67 @@ namespace fc { assert(std::current_exception() == std::exception_ptr()); check_for_timeouts(); - if( !current ) current = new fc::context( &fc::thread::current() ); + if( !current ) + current = new fc::context( &fc::thread::current() ); // check to see if any other contexts are ready - if( ready_head ) { + if( ready_head ) + { fc::context* next = ready_pop_front(); - if( next == current ) { + if( next == current ) + { // elog( "next == current... something went wrong" ); assert(next != current); - return false; + return false; } BOOST_ASSERT( next != current ); // jump to next context, saving current context fc::context* prev = current; current = next; - if( reschedule ) ready_push_back(prev); - // slog( "jump to %p from %p", next, prev ); - // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + if( reschedule ) + ready_push_back(prev); + // slog( "jump to %p from %p", next, prev ); + // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); #if BOOST_VERSION >= 105600 - bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); + bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); #elif BOOST_VERSION >= 105300 - bc::jump_fcontext( prev->my_context, next->my_context, 0 ); + bc::jump_fcontext( prev->my_context, next->my_context, 0 ); #else - bc::jump_fcontext( &prev->my_context, &next->my_context, 0 ); + bc::jump_fcontext( &prev->my_context, &next->my_context, 0 ); #endif - BOOST_ASSERT( current ); - BOOST_ASSERT( current == prev ); + BOOST_ASSERT( current ); + BOOST_ASSERT( current == prev ); //current = prev; - } else { // all contexts are blocked, create a new context - // that will process posted tasks... + } + else + { + // all contexts are blocked, create a new context + // that will process posted tasks... fc::context* prev = current; fc::context* next = nullptr; - if( pt_head ) { // grab cached context + if( pt_head ) + { + // grab cached context next = pt_head; pt_head = pt_head->next; next->next = 0; next->reinitialize(); - } else { // create new context. + } + else + { + // create new context. next = new fc::context( &thread_d::start_process_tasks, stack_alloc, - &fc::thread::current() ); + &fc::thread::current() ); } current = next; - if( reschedule ) ready_push_back(prev); + if( reschedule ) + ready_push_back(prev); - // slog( "jump to %p from %p", next, prev ); - // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); + // slog( "jump to %p from %p", next, prev ); + // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); #if BOOST_VERSION >= 105600 bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); #elif BOOST_VERSION >= 105300 @@ -402,7 +443,8 @@ namespace fc { //current = prev; } - if( current->canceled ) { + if( current->canceled ) + { //current->canceled = false; #ifdef NDEBUG FC_THROW_EXCEPTION( canceled_exception, "" ); @@ -414,27 +456,35 @@ namespace fc { return true; } - static void start_process_tasks( intptr_t my ) { + static void start_process_tasks( intptr_t my ) + { thread_d* self = (thread_d*)my; - try { + try + { self->process_tasks(); - } catch ( canceled_exception& ) { - // allowed exception... - } catch ( ... ) { - elog( "fiber ${name} exited with uncaught exception: ${e}", ("e",fc::except_str())("name", self->name) ); - // assert( !"fiber exited with uncaught exception" ); - //TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<< - // boost::current_exception_diagnostic_information() <name) ); + // assert( !"fiber exited with uncaught exception" ); + //TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<< + // boost::current_exception_diagnostic_information() <free_list.push_back(self->current); self->start_next_fiber( false ); } - bool run_next_task() { + bool run_next_task() + { check_for_timeouts(); task_base* next = dequeue(); - if( next ) { + if( next ) + { next->_set_active_context( current ); current->cur_task = next; next->run(); @@ -446,32 +496,41 @@ namespace fc { } return false; } - bool has_next_task() { + + bool has_next_task() + { if( task_pqueue.size() || (task_sch_queue.size() && task_sch_queue.front()->_when <= time_point::now()) || task_in_queue.load( boost::memory_order_relaxed ) ) - return true; + return true; return false; } - void clear_free_list() { - for( uint32_t i = 0; i < free_list.size(); ++i ) { + + void clear_free_list() + { + for( uint32_t i = 0; i < free_list.size(); ++i ) delete free_list[i]; - } free_list.clear(); } - void process_tasks() { - while( !done || blocked ) { - if( run_next_task() ) continue; + + void process_tasks() + { + while( !done || blocked ) + { + if( run_next_task() ) + continue; // if I have something else to do other than // process tasks... do it. - if( ready_head ) { + if( ready_head ) + { pt_push_back( current ); start_next_fiber(false); continue; } - if( process_canceled_tasks() ) continue; + if( process_canceled_tasks() ) + continue; clear_free_list(); @@ -481,9 +540,12 @@ namespace fc { time_point timeout_time = check_for_timeouts(); if( done ) return; - if( timeout_time == time_point::maximum() ) { + if( timeout_time == time_point::maximum() ) + { task_ready.wait( lock ); - } else if( timeout_time != time_point::min() ) { + } + else if( timeout_time != time_point::min() ) + { // there may be tasks that have been canceled we should filter them out now // rather than waiting... @@ -515,10 +577,12 @@ namespace fc { * Retunn system_clock::time_point::max() if there are no scheduled tasks * Return the time the next task needs to be run if there is anything scheduled. */ - time_point check_for_timeouts() { - if( !sleep_pqueue.size() && !task_sch_queue.size() ) { - //ilog( "no timeouts ready" ); - return time_point::maximum(); + time_point check_for_timeouts() + { + if( !sleep_pqueue.size() && !task_sch_queue.size() ) + { + // ilog( "no timeouts ready" ); + return time_point::maximum(); } time_point next = time_point::maximum(); @@ -528,51 +592,53 @@ namespace fc { next = task_sch_queue.front()->_when; time_point now = time_point::now(); - if( now < next ) { return next; } + if( now < next ) + return next; // move all expired sleeping tasks to the ready queue while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now ) { - fc::context::ptr c = sleep_pqueue.front(); - std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); - //ilog( "sleep pop back..." ); - sleep_pqueue.pop_back(); + fc::context::ptr c = sleep_pqueue.front(); + std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); + // ilog( "sleep pop back..." ); + sleep_pqueue.pop_back(); - if( c->blocking_prom.size() ) - { - // ilog( "timeotu blocking prom" ); - c->timeout_blocking_promises(); - } - else - { - //ilog( "..." ); - //ilog( "ready_push_front" ); - if (c != current) - ready_push_front( c ); - } + if( c->blocking_prom.size() ) + { + // ilog( "timeout blocking prom" ); + c->timeout_blocking_promises(); + } + else + { + // ilog( "..." ); + // ilog( "ready_push_front" ); + if (c != current) + ready_push_front( c ); + } } return time_point::min(); } - void unblock( fc::context* c ) { - if( fc::thread::current().my != this ) { - self.async( [=](){ unblock(c); }, "thread_d::unblock" ); - return; - } - if( c != current ) ready_push_front(c); - } + void unblock( fc::context* c ) + { + if( fc::thread::current().my != this ) + { + self.async( [=](){ unblock(c); }, "thread_d::unblock" ); + return; + } + + if( c != current ) + ready_push_front(c); + } void yield_until( const time_point& tp, bool reschedule ) { check_fiber_exceptions(); if( tp <= (time_point::now()+fc::microseconds(10000)) ) - { return; - } - if( !current ) { + if( !current ) current = new fc::context(&fc::thread::current()); - } current->resume_time = tp; current->clear_blocking_promises(); @@ -584,8 +650,10 @@ namespace fc { start_next_fiber(reschedule); // clear current context from sleep queue... - for( uint32_t i = 0; i < sleep_pqueue.size(); ++i ) { - if( sleep_pqueue[i] == current ) { + for( uint32_t i = 0; i < sleep_pqueue.size(); ++i ) + { + if( sleep_pqueue[i] == current ) + { sleep_pqueue[i] = sleep_pqueue.back(); sleep_pqueue.pop_back(); std::make_heap( sleep_pqueue.begin(), @@ -599,35 +667,37 @@ namespace fc { } void wait( const promise_base::ptr& p, const time_point& timeout ) { - if( p->ready() ) return; + if( p->ready() ) + return; + if( timeout < time_point::now() ) - FC_THROW_EXCEPTION( timeout_exception, "" ); + FC_THROW_EXCEPTION( timeout_exception, "" ); - if( !current ) { + if( !current ) current = new fc::context(&fc::thread::current()); - } - //slog( " %1% blocking on %2%", current, p.get() ); + // slog( " %1% blocking on %2%", current, p.get() ); current->add_blocking_promise(p.get(),true); // if not max timeout, added to sleep pqueue - if( timeout != time_point::maximum() ) { - current->resume_time = timeout; - sleep_pqueue.push_back(current); - std::push_heap( sleep_pqueue.begin(), - sleep_pqueue.end(), - sleep_priority_less() ); + if( timeout != time_point::maximum() ) + { + current->resume_time = timeout; + sleep_pqueue.push_back(current); + std::push_heap( sleep_pqueue.begin(), + sleep_pqueue.end(), + sleep_priority_less() ); } - // elog( "blocking %1%", current ); + // elog( "blocking %1%", current ); add_to_blocked( current ); - // debug("swtiching fibers..." ); + // debug("swtiching fibers..." ); start_next_fiber(); - // slog( "resuming %1%", current ); + // slog( "resuming %1%", current ); - //slog( " %1% unblocking blocking on %2%", current, p.get() ); + // slog( " %1% unblocking blocking on %2%", current, p.get() ); current->remove_blocking_promise(p.get()); check_fiber_exceptions();