From 7bf63742992ea7a20233a753336aaf6bbe88ebdd Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Mon, 19 Aug 2013 14:44:13 -0400 Subject: [PATCH] MAJOR BUG FIX - fc::usleep causing hang&leak In certain cases when usleep is passed a small value, there is a race condition that would cause the process to hang and then when an attempt to quit the thread was made new contexts would be allocated rapidly filling all available memory. --- CMakeLists.txt | 2 ++ src/thread/thread.cpp | 10 ++++++++- src/thread/thread_d.hpp | 50 +++++++++++++++++++++++++---------------- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 901a70e..a556d70 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -136,4 +136,6 @@ set( BOOST_LIBRARIES ${Boost_THREAD_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FIL #target_link_libraries( test_compress fc ${BOOST_LIBRARIES} ) #add_executable( test_aes tests/aes_test.cpp ) #target_link_libraries( test_aes fc ${BOOST_LIBRARIES} ) +#add_executable( test_sleep tests/sleep.cpp ) +#target_link_libraries( test_sleep fc ${BOOST_LIBRARIES} ) diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index b96a9ce..4de4344 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -136,7 +136,6 @@ namespace fc { cur->canceled = true; cur = cur->next; } - my->done = true; // now that we have poked all fibers... switch to the next one and @@ -177,6 +176,14 @@ namespace fc { my->check_fiber_exceptions(); } void thread::sleep_until( const time_point& tp ) { + //ilog( "sleep until ${tp} wait: ${delta}", ("tp",tp)("delta",(tp-fc::time_point::now()).count()) ); + + if( tp <= (time_point::now()+fc::microseconds(500)) ) + { + this->yield(true); + } + my->yield_until( tp, false ); + /* my->check_fiber_exceptions(); BOOST_ASSERT( ¤t() == this ); @@ -195,6 +202,7 @@ namespace fc { my->current->resume_time = time_point::maximum(); my->check_fiber_exceptions(); + */ } int thread::wait_any_until( std::vector&& p, const time_point& timeout) { for( size_t i = 0; i < p.size(); ++i ) { diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index d6758d6..b482b2e 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -89,7 +89,7 @@ namespace fc { #if 0 void debug( const fc::string& s ) { - return; + return; //boost::unique_lock lock(log_mutex()); fc::cerr<<"--------------------- "<next == nullptr ); BOOST_ASSERT( c != current ); - //if( c == current ) wlog( "pushing current to ready??" ); + //if( c == current ) wlog( "pushing current to ready??" ); c->next = ready_head; ready_head = c; if( !ready_tail ) @@ -174,7 +174,7 @@ namespace fc { void ready_push_back( const fc::context::ptr& c ) { BOOST_ASSERT( c->next == nullptr ); BOOST_ASSERT( c != current ); - //if( c == current ) wlog( "pushing current to ready??" ); + //if( c == current ) wlog( "pushing current to ready??" ); c->next = 0; if( ready_tail ) { ready_tail->next = c; @@ -246,6 +246,7 @@ namespace fc { if( current && current->canceled ) { FC_THROW_EXCEPTION( canceled_exception, "" ); } else if( done ) { + ilog( "throwing canceled exception" ); FC_THROW_EXCEPTION( canceled_exception, "" ); // BOOST_THROW_EXCEPTION( thread_quit() ); } @@ -398,47 +399,58 @@ namespace fc { */ time_point check_for_timeouts() { if( !sleep_pqueue.size() && !task_sch_queue.size() ) { + //ilog( "no timeouts ready" ); return time_point::maximum(); } - time_point next = time_point::maximum(); - if( task_sch_queue.size() && next > task_sch_queue.front()->_when ) - next = task_sch_queue.front()->_when; if( sleep_pqueue.size() && next > sleep_pqueue.front()->resume_time ) next = sleep_pqueue.front()->resume_time; + if( task_sch_queue.size() && next > task_sch_queue.front()->_when ) + next = task_sch_queue.front()->_when; time_point now = time_point::now(); if( now < next ) { return next; } // move all expired sleeping tasks to the ready queue - while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now ) { + while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now ) + { fc::context::ptr c = sleep_pqueue.front(); std::pop_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); + //ilog( "sleep pop back..." ); sleep_pqueue.pop_back(); - if( c->blocking_prom.size() ) { + if( c->blocking_prom.size() ) + { + // ilog( "timeotu blocking prom" ); c->timeout_blocking_promises(); } - else { - if( c != current ) ready_push_front( c ); + else + { + //ilog( "..." ); + FC_ASSERT( c != current ) + //ilog( "ready_push_front" ); + ready_push_front( c ); } } return time_point::min(); } - void unblock( fc::context* c ) { - if( fc::thread::current().my != this ) { - async( [=](){ unblock(c); } ); - return; - } - if( c != current ) ready_push_front(c); - } + void unblock( fc::context* c ) { + if( fc::thread::current().my != this ) { + async( [=](){ unblock(c); } ); + return; + } + if( c != current ) ready_push_front(c); + } + void yield_until( const time_point& tp, bool reschedule ) { check_fiber_exceptions(); - if( tp <= time_point::now() ) + if( tp <= (time_point::now()+fc::microseconds(500)) ) + { return; + } if( !current ) { current = new fc::context(&fc::thread::current()); @@ -450,7 +462,7 @@ namespace fc { sleep_pqueue.push_back(current); std::push_heap( sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less() ); - + start_next_fiber(reschedule); // clear current context from sleep queue...