whitespace and spelling fixes, no functional changes

This commit is contained in:
Eric Frias 2014-10-13 11:44:16 -04:00
parent 5e7cd9776e
commit a426bf9710

View file

@ -37,7 +37,9 @@ namespace fc {
name = fc::string("th_") + char('a'+cnt++); name = fc::string("th_") + char('a'+cnt++);
// printf("thread=%p\n",this); // printf("thread=%p\n",this);
} }
~thread_d(){
~thread_d()
{
delete current; delete current;
fc::context* temp; fc::context* temp;
while (ready_head) while (ready_head)
@ -66,7 +68,8 @@ namespace fc {
boost_thread->detach(); boost_thread->detach();
delete boost_thread; delete boost_thread;
} }
} }
fc::thread& self; fc::thread& self;
boost::thread* boost_thread; boost::thread* boost_thread;
stack_allocator stack_alloc; stack_allocator stack_alloc;
@ -149,12 +152,14 @@ namespace fc {
} }
#endif #endif
// insert at from of blocked linked list // insert at from of blocked linked list
inline void add_to_blocked( fc::context* c ) { inline void add_to_blocked( fc::context* c )
{
c->next_blocked = blocked; c->next_blocked = blocked;
blocked = c; blocked = c;
} }
void pt_push_back(fc::context* c) { void pt_push_back(fc::context* c)
{
c->next = pt_head; c->next = pt_head;
pt_head = c; pt_head = c;
/* /*
@ -167,9 +172,12 @@ namespace fc {
wlog( "idle context...%2% %1%", c, i ); wlog( "idle context...%2% %1%", c, i );
*/ */
} }
fc::context::ptr ready_pop_front() {
fc::context::ptr ready_pop_front()
{
fc::context::ptr tmp = nullptr; fc::context::ptr tmp = nullptr;
if( ready_head ) { if( ready_head )
{
tmp = ready_head; tmp = ready_head;
ready_head = tmp->next; ready_head = tmp->next;
if( !ready_head ) if( !ready_head )
@ -178,35 +186,48 @@ namespace fc {
} }
return tmp; return tmp;
} }
void ready_push_front( const fc::context::ptr& c ) {
void ready_push_front( const fc::context::ptr& c )
{
BOOST_ASSERT( c->next == nullptr ); BOOST_ASSERT( c->next == nullptr );
BOOST_ASSERT( c != current ); BOOST_ASSERT( c != current );
//if( c == current ) wlog( "pushing current to ready??" ); // if( c == current )
// wlog( "pushing current to ready??" );
c->next = ready_head; c->next = ready_head;
ready_head = c; ready_head = c;
if( !ready_tail ) if( !ready_tail )
ready_tail = c; ready_tail = c;
} }
void ready_push_back( const fc::context::ptr& c ) {
void ready_push_back( const fc::context::ptr& c )
{
BOOST_ASSERT( c->next == nullptr ); BOOST_ASSERT( c->next == nullptr );
BOOST_ASSERT( c != current ); BOOST_ASSERT( c != current );
//if( c == current ) wlog( "pushing current to ready??" ); // if( c == current )
// wlog( "pushing current to ready??" );
c->next = 0; c->next = 0;
if( ready_tail ) { if( ready_tail )
ready_tail->next = c; ready_tail->next = c;
} else { else
assert( !ready_head ); {
ready_head = c; assert( !ready_head );
ready_head = c;
} }
ready_tail = c; ready_tail = c;
} }
struct task_priority_less {
bool operator()( task_base* a, task_base* b ) { struct task_priority_less
{
bool operator()( task_base* a, task_base* b )
{
return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num ); return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num );
} }
}; };
struct task_when_less {
bool operator()( task_base* a, task_base* b ) { struct task_when_less
{
bool operator()( task_base* a, task_base* b )
{
return a->_when > b->_when; return a->_when > b->_when;
} }
}; };
@ -251,7 +272,8 @@ namespace fc {
} }
} }
task_base* dequeue() { task_base* dequeue()
{
// get a new task // get a new task
BOOST_ASSERT( this == thread::current().my ); BOOST_ASSERT( this == thread::current().my );
@ -260,18 +282,22 @@ namespace fc {
//This appears to be safest replacement for now, maybe //This appears to be safest replacement for now, maybe
//can be changed to relaxed later, but needs analysis. //can be changed to relaxed later, but needs analysis.
pending = task_in_queue.exchange(0,boost::memory_order_seq_cst); pending = task_in_queue.exchange(0,boost::memory_order_seq_cst);
if( pending ) { enqueue( pending ); } if( pending )
enqueue( pending );
task_base* p(0); task_base* p = 0;
if( task_sch_queue.size() ) { if( !task_sch_queue.empty() )
if( task_sch_queue.front()->_when <= time_point::now() ) { {
if( task_sch_queue.front()->_when <= time_point::now() )
{
p = task_sch_queue.front(); p = task_sch_queue.front();
std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less() ); std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less() );
task_sch_queue.pop_back(); task_sch_queue.pop_back();
return p; return p;
} }
} }
if( task_pqueue.size() ) { if( !task_pqueue.empty() )
{
p = task_pqueue.front(); p = task_pqueue.front();
std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() );
task_pqueue.pop_back(); task_pqueue.pop_back();
@ -307,7 +333,8 @@ namespace fc {
* This should be before or after a context switch to * This should be before or after a context switch to
* detect quit/cancel operations and throw an exception. * detect quit/cancel operations and throw an exception.
*/ */
void check_fiber_exceptions() { void check_fiber_exceptions()
{
if( current && current->canceled ) { if( current && current->canceled ) {
#ifdef NDEBUG #ifdef NDEBUG
FC_THROW_EXCEPTION( canceled_exception, "" ); FC_THROW_EXCEPTION( canceled_exception, "" );
@ -317,7 +344,7 @@ namespace fc {
} else if( done ) { } else if( done ) {
ilog( "throwing canceled exception" ); ilog( "throwing canceled exception" );
FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: thread quitting" ); FC_THROW_EXCEPTION( canceled_exception, "cancellation reason: thread quitting" );
// BOOST_THROW_EXCEPTION( thread_quit() ); // BOOST_THROW_EXCEPTION( thread_quit() );
} }
} }
@ -326,7 +353,8 @@ namespace fc {
* If none are available then create a new context and * If none are available then create a new context and
* have it wait for something to do. * have it wait for something to do.
*/ */
bool start_next_fiber( bool reschedule = false ) { bool start_next_fiber( bool reschedule = false )
{
/* If this assert fires, it means you are executing an operation that is causing /* If this assert fires, it means you are executing an operation that is causing
* the current task to yield, but there is a ASSERT_TASK_NOT_PREEMPTED() in effect * the current task to yield, but there is a ASSERT_TASK_NOT_PREEMPTED() in effect
* (somewhere up the stack) */ * (somewhere up the stack) */
@ -342,54 +370,67 @@ namespace fc {
assert(std::current_exception() == std::exception_ptr()); assert(std::current_exception() == std::exception_ptr());
check_for_timeouts(); check_for_timeouts();
if( !current ) current = new fc::context( &fc::thread::current() ); if( !current )
current = new fc::context( &fc::thread::current() );
// check to see if any other contexts are ready // check to see if any other contexts are ready
if( ready_head ) { if( ready_head )
{
fc::context* next = ready_pop_front(); fc::context* next = ready_pop_front();
if( next == current ) { if( next == current )
{
// elog( "next == current... something went wrong" ); // elog( "next == current... something went wrong" );
assert(next != current); assert(next != current);
return false; return false;
} }
BOOST_ASSERT( next != current ); BOOST_ASSERT( next != current );
// jump to next context, saving current context // jump to next context, saving current context
fc::context* prev = current; fc::context* prev = current;
current = next; current = next;
if( reschedule ) ready_push_back(prev); if( reschedule )
// slog( "jump to %p from %p", next, prev ); ready_push_back(prev);
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); // slog( "jump to %p from %p", next, prev );
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
#if BOOST_VERSION >= 105600 #if BOOST_VERSION >= 105600
bc::jump_fcontext( &prev->my_context, next->my_context, 0 ); bc::jump_fcontext( &prev->my_context, next->my_context, 0 );
#elif BOOST_VERSION >= 105300 #elif BOOST_VERSION >= 105300
bc::jump_fcontext( prev->my_context, next->my_context, 0 ); bc::jump_fcontext( prev->my_context, next->my_context, 0 );
#else #else
bc::jump_fcontext( &prev->my_context, &next->my_context, 0 ); bc::jump_fcontext( &prev->my_context, &next->my_context, 0 );
#endif #endif
BOOST_ASSERT( current ); BOOST_ASSERT( current );
BOOST_ASSERT( current == prev ); BOOST_ASSERT( current == prev );
//current = prev; //current = prev;
} else { // all contexts are blocked, create a new context }
// that will process posted tasks... else
{
// all contexts are blocked, create a new context
// that will process posted tasks...
fc::context* prev = current; fc::context* prev = current;
fc::context* next = nullptr; fc::context* next = nullptr;
if( pt_head ) { // grab cached context if( pt_head )
{
// grab cached context
next = pt_head; next = pt_head;
pt_head = pt_head->next; pt_head = pt_head->next;
next->next = 0; next->next = 0;
next->reinitialize(); next->reinitialize();
} else { // create new context. }
else
{
// create new context.
next = new fc::context( &thread_d::start_process_tasks, stack_alloc, next = new fc::context( &thread_d::start_process_tasks, stack_alloc,
&fc::thread::current() ); &fc::thread::current() );
} }
current = next; current = next;
if( reschedule ) ready_push_back(prev); if( reschedule )
ready_push_back(prev);
// slog( "jump to %p from %p", next, prev ); // slog( "jump to %p from %p", next, prev );
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
#if BOOST_VERSION >= 105600 #if BOOST_VERSION >= 105600
bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this ); bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this );
#elif BOOST_VERSION >= 105300 #elif BOOST_VERSION >= 105300
@ -402,7 +443,8 @@ namespace fc {
//current = prev; //current = prev;
} }
if( current->canceled ) { if( current->canceled )
{
//current->canceled = false; //current->canceled = false;
#ifdef NDEBUG #ifdef NDEBUG
FC_THROW_EXCEPTION( canceled_exception, "" ); FC_THROW_EXCEPTION( canceled_exception, "" );
@ -414,27 +456,35 @@ namespace fc {
return true; return true;
} }
static void start_process_tasks( intptr_t my ) { static void start_process_tasks( intptr_t my )
{
thread_d* self = (thread_d*)my; thread_d* self = (thread_d*)my;
try { try
{
self->process_tasks(); self->process_tasks();
} catch ( canceled_exception& ) { }
// allowed exception... catch ( canceled_exception& )
} catch ( ... ) { {
elog( "fiber ${name} exited with uncaught exception: ${e}", ("e",fc::except_str())("name", self->name) ); // allowed exception...
// assert( !"fiber exited with uncaught exception" ); }
//TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<< catch ( ... )
// boost::current_exception_diagnostic_information() <<std::endl; {
elog( "fiber ${name} exited with uncaught exception: ${e}", ("e",fc::except_str())("name", self->name) );
// assert( !"fiber exited with uncaught exception" );
//TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<<
// boost::current_exception_diagnostic_information() <<std::endl;
} }
self->free_list.push_back(self->current); self->free_list.push_back(self->current);
self->start_next_fiber( false ); self->start_next_fiber( false );
} }
bool run_next_task() { bool run_next_task()
{
check_for_timeouts(); check_for_timeouts();
task_base* next = dequeue(); task_base* next = dequeue();
if( next ) { if( next )
{
next->_set_active_context( current ); next->_set_active_context( current );
current->cur_task = next; current->cur_task = next;
next->run(); next->run();
@ -446,32 +496,41 @@ namespace fc {
} }
return false; return false;
} }
bool has_next_task() {
bool has_next_task()
{
if( task_pqueue.size() || if( task_pqueue.size() ||
(task_sch_queue.size() && task_sch_queue.front()->_when <= time_point::now()) || (task_sch_queue.size() && task_sch_queue.front()->_when <= time_point::now()) ||
task_in_queue.load( boost::memory_order_relaxed ) ) task_in_queue.load( boost::memory_order_relaxed ) )
return true; return true;
return false; return false;
} }
void clear_free_list() {
for( uint32_t i = 0; i < free_list.size(); ++i ) { void clear_free_list()
{
for( uint32_t i = 0; i < free_list.size(); ++i )
delete free_list[i]; delete free_list[i];
}
free_list.clear(); free_list.clear();
} }
void process_tasks() {
while( !done || blocked ) { void process_tasks()
if( run_next_task() ) continue; {
while( !done || blocked )
{
if( run_next_task() )
continue;
// if I have something else to do other than // if I have something else to do other than
// process tasks... do it. // process tasks... do it.
if( ready_head ) { if( ready_head )
{
pt_push_back( current ); pt_push_back( current );
start_next_fiber(false); start_next_fiber(false);
continue; continue;
} }
if( process_canceled_tasks() ) continue; if( process_canceled_tasks() )
continue;
clear_free_list(); clear_free_list();
@ -481,9 +540,12 @@ namespace fc {
time_point timeout_time = check_for_timeouts(); time_point timeout_time = check_for_timeouts();
if( done ) return; if( done ) return;
if( timeout_time == time_point::maximum() ) { if( timeout_time == time_point::maximum() )
{
task_ready.wait( lock ); task_ready.wait( lock );
} else if( timeout_time != time_point::min() ) { }
else if( timeout_time != time_point::min() )
{
// there may be tasks that have been canceled we should filter them out now // there may be tasks that have been canceled we should filter them out now
// rather than waiting... // rather than waiting...
@ -515,10 +577,12 @@ namespace fc {
* Retunn system_clock::time_point::max() if there are no scheduled tasks * Retunn system_clock::time_point::max() if there are no scheduled tasks
* Return the time the next task needs to be run if there is anything scheduled. * Return the time the next task needs to be run if there is anything scheduled.
*/ */
time_point check_for_timeouts() { time_point check_for_timeouts()
if( !sleep_pqueue.size() && !task_sch_queue.size() ) { {
//ilog( "no timeouts ready" ); if( !sleep_pqueue.size() && !task_sch_queue.size() )
return time_point::maximum(); {
// ilog( "no timeouts ready" );
return time_point::maximum();
} }
time_point next = time_point::maximum(); time_point next = time_point::maximum();
@ -528,51 +592,53 @@ namespace fc {
next = task_sch_queue.front()->_when; next = task_sch_queue.front()->_when;
time_point now = time_point::now(); time_point now = time_point::now();
if( now < next ) { return next; } if( now < next )
return next;
// move all expired sleeping tasks to the ready queue // move all expired sleeping tasks to the ready queue
while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now ) while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now )
{ {
fc::context::ptr c = sleep_pqueue.front(); fc::context::ptr c = sleep_pqueue.front();
std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() );
//ilog( "sleep pop back..." ); // ilog( "sleep pop back..." );
sleep_pqueue.pop_back(); sleep_pqueue.pop_back();
if( c->blocking_prom.size() ) if( c->blocking_prom.size() )
{ {
// ilog( "timeotu blocking prom" ); // ilog( "timeout blocking prom" );
c->timeout_blocking_promises(); c->timeout_blocking_promises();
} }
else else
{ {
//ilog( "..." ); // ilog( "..." );
//ilog( "ready_push_front" ); // ilog( "ready_push_front" );
if (c != current) if (c != current)
ready_push_front( c ); ready_push_front( c );
} }
} }
return time_point::min(); return time_point::min();
} }
void unblock( fc::context* c ) { void unblock( fc::context* c )
if( fc::thread::current().my != this ) { {
self.async( [=](){ unblock(c); }, "thread_d::unblock" ); if( fc::thread::current().my != this )
return; {
} self.async( [=](){ unblock(c); }, "thread_d::unblock" );
if( c != current ) ready_push_front(c); return;
} }
if( c != current )
ready_push_front(c);
}
void yield_until( const time_point& tp, bool reschedule ) { void yield_until( const time_point& tp, bool reschedule ) {
check_fiber_exceptions(); check_fiber_exceptions();
if( tp <= (time_point::now()+fc::microseconds(10000)) ) if( tp <= (time_point::now()+fc::microseconds(10000)) )
{
return; return;
}
if( !current ) { if( !current )
current = new fc::context(&fc::thread::current()); current = new fc::context(&fc::thread::current());
}
current->resume_time = tp; current->resume_time = tp;
current->clear_blocking_promises(); current->clear_blocking_promises();
@ -584,8 +650,10 @@ namespace fc {
start_next_fiber(reschedule); start_next_fiber(reschedule);
// clear current context from sleep queue... // clear current context from sleep queue...
for( uint32_t i = 0; i < sleep_pqueue.size(); ++i ) { for( uint32_t i = 0; i < sleep_pqueue.size(); ++i )
if( sleep_pqueue[i] == current ) { {
if( sleep_pqueue[i] == current )
{
sleep_pqueue[i] = sleep_pqueue.back(); sleep_pqueue[i] = sleep_pqueue.back();
sleep_pqueue.pop_back(); sleep_pqueue.pop_back();
std::make_heap( sleep_pqueue.begin(), std::make_heap( sleep_pqueue.begin(),
@ -599,35 +667,37 @@ namespace fc {
} }
void wait( const promise_base::ptr& p, const time_point& timeout ) { void wait( const promise_base::ptr& p, const time_point& timeout ) {
if( p->ready() ) return; if( p->ready() )
return;
if( timeout < time_point::now() ) if( timeout < time_point::now() )
FC_THROW_EXCEPTION( timeout_exception, "" ); FC_THROW_EXCEPTION( timeout_exception, "" );
if( !current ) { if( !current )
current = new fc::context(&fc::thread::current()); current = new fc::context(&fc::thread::current());
}
//slog( " %1% blocking on %2%", current, p.get() ); // slog( " %1% blocking on %2%", current, p.get() );
current->add_blocking_promise(p.get(),true); current->add_blocking_promise(p.get(),true);
// if not max timeout, added to sleep pqueue // if not max timeout, added to sleep pqueue
if( timeout != time_point::maximum() ) { if( timeout != time_point::maximum() )
current->resume_time = timeout; {
sleep_pqueue.push_back(current); current->resume_time = timeout;
std::push_heap( sleep_pqueue.begin(), sleep_pqueue.push_back(current);
sleep_pqueue.end(), std::push_heap( sleep_pqueue.begin(),
sleep_priority_less() ); sleep_pqueue.end(),
sleep_priority_less() );
} }
// elog( "blocking %1%", current ); // elog( "blocking %1%", current );
add_to_blocked( current ); add_to_blocked( current );
// debug("swtiching fibers..." ); // debug("swtiching fibers..." );
start_next_fiber(); start_next_fiber();
// slog( "resuming %1%", current ); // slog( "resuming %1%", current );
//slog( " %1% unblocking blocking on %2%", current, p.get() ); // slog( " %1% unblocking blocking on %2%", current, p.get() );
current->remove_blocking_promise(p.get()); current->remove_blocking_promise(p.get());
check_fiber_exceptions(); check_fiber_exceptions();