2013-06-05 19:19:00 +00:00
# include <fc/thread/thread.hpp>
2012-09-08 02:50:37 +00:00
# include <fc/string.hpp>
# include <fc/time.hpp>
# include <boost/thread.hpp>
# include "context.hpp"
# include <boost/thread/condition_variable.hpp>
# include <boost/thread.hpp>
# include <boost/atomic.hpp>
# include <vector>
2013-06-05 19:19:00 +00:00
//#include <fc/logger.hpp>
2012-09-08 02:50:37 +00:00
namespace fc {
struct sleep_priority_less {
bool operator ( ) ( const context : : ptr & a , const context : : ptr & b ) {
return a - > resume_time > b - > resume_time ;
}
} ;
class thread_d {
2013-06-05 19:19:00 +00:00
2012-09-08 02:50:37 +00:00
public :
thread_d ( fc : : thread & s )
: self ( s ) , boost_thread ( 0 ) ,
task_in_queue ( 0 ) ,
done ( false ) ,
current ( 0 ) ,
pt_head ( 0 ) ,
ready_head ( 0 ) ,
ready_tail ( 0 ) ,
2014-08-27 16:20:19 +00:00
blocked ( 0 ) ,
next_unused_task_storage_slot ( 0 )
2014-08-02 23:43:28 +00:00
# ifndef NDEBUG
, non_preemptable_scope_count ( 0 )
# endif
2012-09-08 02:50:37 +00:00
{
2013-06-05 19:19:00 +00:00
static boost : : atomic < int > cnt ( 0 ) ;
name = fc : : string ( " th_ " ) + char ( ' a ' + cnt + + ) ;
// printf("thread=%p\n",this);
2012-09-08 02:50:37 +00:00
}
2012-09-11 03:13:31 +00:00
~ thread_d ( ) {
2013-06-05 19:19:00 +00:00
delete current ;
fc : : context * temp ;
while ( ready_head )
{
temp = ready_head - > next ;
delete ready_head ;
ready_head = temp ;
}
while ( blocked )
{
temp = blocked - > next ;
delete blocked ;
blocked = temp ;
}
/*
while ( pt_head )
{
temp = pt_head - > next ;
delete pt_head ;
pt_head = temp ;
}
*/
2013-07-19 02:19:19 +00:00
//ilog("");
2013-06-05 19:19:00 +00:00
if ( boost_thread )
{
boost_thread - > detach ( ) ;
delete boost_thread ;
}
2012-09-11 03:13:31 +00:00
}
2012-09-08 02:50:37 +00:00
fc : : thread & self ;
boost : : thread * boost_thread ;
2014-09-07 22:02:39 +00:00
stack_allocator stack_alloc ;
2012-09-08 02:50:37 +00:00
boost : : condition_variable task_ready ;
2012-09-11 03:13:31 +00:00
boost : : mutex task_ready_mutex ;
2012-09-08 02:50:37 +00:00
boost : : atomic < task_base * > task_in_queue ;
2014-09-07 22:02:39 +00:00
std : : vector < task_base * > task_pqueue ; // heap of tasks that have never started, ordered by proirity & scheduling time
std : : vector < task_base * > task_sch_queue ; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run
std : : vector < fc : : context * > sleep_pqueue ; // heap of running tasks that have sleeped, ordered by the time they should resume
std : : vector < fc : : context * > free_list ; // list of unused contexts that are ready for deletion
2012-09-08 02:50:37 +00:00
bool done ;
fc : : string name ;
2014-09-07 22:02:39 +00:00
fc : : context * current ; // the currently-executing task in this thread
2012-09-08 02:50:37 +00:00
2014-09-07 22:02:39 +00:00
fc : : context * pt_head ; // list of contexts that can be reused for new tasks
2012-09-08 02:50:37 +00:00
2014-09-07 22:02:39 +00:00
fc : : context * ready_head ; // linked list (using 'next') of contexts that are ready to run
2012-09-08 02:50:37 +00:00
fc : : context * ready_tail ;
2014-09-07 22:02:39 +00:00
fc : : context * blocked ; // linked list of contexts (using 'next_blocked') blocked on promises via wait()
2014-08-27 16:20:19 +00:00
// values for thread specific data objects for this thread
std : : vector < detail : : specific_data_info > thread_specific_data ;
// values for task_specific data for code executing on a thread that's
// not a task launched by async (usually the default task on the main
// thread in a process)
std : : vector < detail : : specific_data_info > non_task_specific_data ;
unsigned next_unused_task_storage_slot ;
2014-08-02 23:43:28 +00:00
# ifndef NDEBUG
unsigned non_preemptable_scope_count ;
# endif
2012-09-08 02:50:37 +00:00
2013-06-05 19:19:00 +00:00
#if 0
2012-09-08 02:50:37 +00:00
void debug ( const fc : : string & s ) {
2013-08-19 18:44:13 +00:00
return ;
2013-06-05 19:19:00 +00:00
//boost::unique_lock<boost::mutex> lock(log_mutex());
2012-09-08 02:50:37 +00:00
2013-06-05 19:19:00 +00:00
fc : : cerr < < " --------------------- " < < s . c_str ( ) < < " - " < < current ;
if ( current & & current - > cur_task ) fc : : cerr < < ' ( ' < < current - > cur_task - > get_desc ( ) < < ' ) ' ;
fc : : cerr < < " --------------------------- \n " ;
fc : : cerr < < " Ready \n " ;
2012-09-08 02:50:37 +00:00
fc : : context * c = ready_head ;
while ( c ) {
2013-06-05 19:19:00 +00:00
fc : : cerr < < " " < < c ;
if ( c - > cur_task ) fc : : cerr < < ' ( ' < < c - > cur_task - > get_desc ( ) < < ' ) ' ;
2012-09-08 02:50:37 +00:00
fc : : context * p = c - > caller_context ;
while ( p ) {
2013-06-05 19:19:00 +00:00
fc : : cerr < < " -> " < < p ;
2012-09-08 02:50:37 +00:00
p = p - > caller_context ;
}
2013-06-05 19:19:00 +00:00
fc : : cerr < < " \n " ;
2012-09-08 02:50:37 +00:00
c = c - > next ;
}
2013-06-05 19:19:00 +00:00
fc : : cerr < < " Blocked \n " ;
2012-09-08 02:50:37 +00:00
c = blocked ;
while ( c ) {
2013-06-05 19:19:00 +00:00
fc : : cerr < < " ctx: " < < c ;
if ( c - > cur_task ) fc : : cerr < < ' ( ' < < c - > cur_task - > get_desc ( ) < < ' ) ' ;
fc : : cerr < < " blocked on prom: " ;
2012-09-08 02:50:37 +00:00
for ( uint32_t i = 0 ; i < c - > blocking_prom . size ( ) ; + + i ) {
2013-06-05 19:19:00 +00:00
fc : : cerr < < c - > blocking_prom [ i ] . prom < < ' ( ' < < c - > blocking_prom [ i ] . prom - > get_desc ( ) < < ' ) ' ;
2012-09-08 02:50:37 +00:00
if ( i + 1 < c - > blocking_prom . size ( ) ) {
2013-06-05 19:19:00 +00:00
fc : : cerr < < " , " ;
2012-09-08 02:50:37 +00:00
}
}
fc : : context * p = c - > caller_context ;
while ( p ) {
2013-06-05 19:19:00 +00:00
fc : : cerr < < " -> " < < p ;
2012-09-08 02:50:37 +00:00
p = p - > caller_context ;
}
2013-06-05 19:19:00 +00:00
fc : : cerr < < " \n " ;
2012-09-08 02:50:37 +00:00
c = c - > next_blocked ;
}
2013-06-05 19:19:00 +00:00
fc : : cerr < < " ------------------------------------------------- \n " ;
2012-09-08 02:50:37 +00:00
}
2013-06-05 19:19:00 +00:00
# endif
2012-09-08 02:50:37 +00:00
// insert at from of blocked linked list
inline void add_to_blocked ( fc : : context * c ) {
c - > next_blocked = blocked ;
blocked = c ;
}
void pt_push_back ( fc : : context * c ) {
c - > next = pt_head ;
pt_head = c ;
/*
fc : : context * n = pt_head ;
int i = 0 ;
while ( n ) {
+ + i ;
n = n - > next ;
}
wlog ( " idle context...%2% %1% " , c , i ) ;
*/
}
fc : : context : : ptr ready_pop_front ( ) {
2013-02-07 21:08:43 +00:00
fc : : context : : ptr tmp = nullptr ;
2012-09-08 02:50:37 +00:00
if ( ready_head ) {
tmp = ready_head ;
ready_head = tmp - > next ;
if ( ! ready_head )
2013-02-07 21:08:43 +00:00
ready_tail = nullptr ;
tmp - > next = nullptr ;
2012-09-08 02:50:37 +00:00
}
return tmp ;
}
void ready_push_front ( const fc : : context : : ptr & c ) {
2013-02-07 21:08:43 +00:00
BOOST_ASSERT ( c - > next = = nullptr ) ;
BOOST_ASSERT ( c ! = current ) ;
2013-08-19 18:44:13 +00:00
//if( c == current ) wlog( "pushing current to ready??" );
2012-09-08 02:50:37 +00:00
c - > next = ready_head ;
ready_head = c ;
if ( ! ready_tail )
ready_tail = c ;
}
void ready_push_back ( const fc : : context : : ptr & c ) {
2013-02-07 21:08:43 +00:00
BOOST_ASSERT ( c - > next = = nullptr ) ;
BOOST_ASSERT ( c ! = current ) ;
2013-08-19 18:44:13 +00:00
//if( c == current ) wlog( "pushing current to ready??" );
2012-09-08 02:50:37 +00:00
c - > next = 0 ;
if ( ready_tail ) {
ready_tail - > next = c ;
} else {
2013-02-07 21:08:43 +00:00
assert ( ! ready_head ) ;
2012-09-08 02:50:37 +00:00
ready_head = c ;
}
ready_tail = c ;
}
struct task_priority_less {
bool operator ( ) ( task_base * a , task_base * b ) {
return a - > _prio . value < b - > _prio . value ? true : ( a - > _prio . value > b - > _prio . value ? false : a - > _posted_num > b - > _posted_num ) ;
}
} ;
struct task_when_less {
bool operator ( ) ( task_base * a , task_base * b ) {
2012-09-23 06:01:27 +00:00
return a - > _when > b - > _when ;
2012-09-08 02:50:37 +00:00
}
} ;
void enqueue ( task_base * t ) {
time_point now = time_point : : now ( ) ;
task_base * cur = t ;
while ( cur ) {
if ( cur - > _when > now ) {
task_sch_queue . push_back ( cur ) ;
std : : push_heap ( task_sch_queue . begin ( ) ,
task_sch_queue . end ( ) , task_when_less ( ) ) ;
} else {
task_pqueue . push_back ( cur ) ;
BOOST_ASSERT ( this = = thread : : current ( ) . my ) ;
std : : push_heap ( task_pqueue . begin ( ) ,
task_pqueue . end ( ) , task_priority_less ( ) ) ;
}
cur = cur - > _next ;
}
}
task_base * dequeue ( ) {
// get a new task
BOOST_ASSERT ( this = = thread : : current ( ) . my ) ;
task_base * pending = 0 ;
2013-12-12 14:19:15 +00:00
//DLN: changed from memory_order_consume for boost 1.55.
//This appears to be safest replacement for now, maybe
//can be changed to relaxed later, but needs analysis.
pending = task_in_queue . exchange ( 0 , boost : : memory_order_seq_cst ) ;
2012-09-08 02:50:37 +00:00
if ( pending ) { enqueue ( pending ) ; }
task_base * p ( 0 ) ;
if ( task_sch_queue . size ( ) ) {
if ( task_sch_queue . front ( ) - > _when < = time_point : : now ( ) ) {
p = task_sch_queue . front ( ) ;
std : : pop_heap ( task_sch_queue . begin ( ) , task_sch_queue . end ( ) , task_when_less ( ) ) ;
task_sch_queue . pop_back ( ) ;
return p ;
}
}
if ( task_pqueue . size ( ) ) {
p = task_pqueue . front ( ) ;
std : : pop_heap ( task_pqueue . begin ( ) , task_pqueue . end ( ) , task_priority_less ( ) ) ;
task_pqueue . pop_back ( ) ;
}
return p ;
}
2014-06-29 01:46:10 +00:00
bool process_canceled_tasks ( )
{
bool canceled_task = false ;
for ( auto task_itr = task_sch_queue . begin ( ) ;
task_itr ! = task_sch_queue . end ( ) ;
)
{
if ( ( * task_itr ) - > canceled ( ) )
{
( * task_itr ) - > run ( ) ;
( * task_itr ) - > release ( ) ;
2014-06-29 22:50:05 +00:00
task_itr = task_sch_queue . erase ( task_itr ) ;
2014-06-29 01:46:10 +00:00
canceled_task = true ;
continue ;
}
+ + task_itr ;
}
if ( canceled_task )
std : : make_heap ( task_sch_queue . begin ( ) , task_sch_queue . end ( ) , task_when_less ( ) ) ;
return canceled_task ;
}
2012-09-08 02:50:37 +00:00
/**
* This should be before or after a context switch to
* detect quit / cancel operations and throw an exception .
*/
void check_fiber_exceptions ( ) {
if ( current & & current - > canceled ) {
2014-08-28 19:42:01 +00:00
# ifdef NDEBUG
2013-06-05 19:19:00 +00:00
FC_THROW_EXCEPTION ( canceled_exception , " " ) ;
2014-08-28 19:42:01 +00:00
# else
FC_THROW_EXCEPTION ( canceled_exception , " cancellation reason: ${reason} " , ( " reason " , current - > cancellation_reason ? current - > cancellation_reason : " [none given] " ) ) ;
# endif
2012-09-08 02:50:37 +00:00
} else if ( done ) {
2013-08-19 18:44:13 +00:00
ilog ( " throwing canceled exception " ) ;
2014-08-28 19:42:01 +00:00
FC_THROW_EXCEPTION ( canceled_exception , " cancellation reason: thread quitting " ) ;
2013-06-05 19:19:00 +00:00
// BOOST_THROW_EXCEPTION( thread_quit() );
2012-09-08 02:50:37 +00:00
}
}
/**
* Find the next available context and switch to it .
* If none are available then create a new context and
* have it wait for something to do .
*/
bool start_next_fiber ( bool reschedule = false ) {
2014-08-02 23:43:28 +00:00
assert ( non_preemptable_scope_count = = 0 ) ;
2012-09-08 02:50:37 +00:00
check_for_timeouts ( ) ;
if ( ! current ) current = new fc : : context ( & fc : : thread : : current ( ) ) ;
// check to see if any other contexts are ready
if ( ready_head ) {
fc : : context * next = ready_pop_front ( ) ;
2013-02-07 21:08:43 +00:00
if ( next = = current ) {
2013-06-05 19:19:00 +00:00
// elog( "next == current... something went wrong" );
assert ( next ! = current ) ;
2013-02-07 21:08:43 +00:00
return false ;
}
2012-09-08 02:50:37 +00:00
BOOST_ASSERT ( next ! = current ) ;
// jump to next context, saving current context
fc : : context * prev = current ;
current = next ;
2013-02-07 21:08:43 +00:00
if ( reschedule ) ready_push_back ( prev ) ;
// slog( "jump to %p from %p", next, prev );
2014-09-07 22:02:39 +00:00
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
# if BOOST_VERSION >= 105600
bc : : jump_fcontext ( & prev - > my_context , next - > my_context , 0 ) ;
# elif BOOST_VERSION >= 105300
2013-06-05 19:19:00 +00:00
bc : : jump_fcontext ( prev - > my_context , next - > my_context , 0 ) ;
# else
2013-02-07 21:08:43 +00:00
bc : : jump_fcontext ( & prev - > my_context , & next - > my_context , 0 ) ;
2013-06-05 19:19:00 +00:00
# endif
2013-02-07 21:08:43 +00:00
BOOST_ASSERT ( current ) ;
BOOST_ASSERT ( current = = prev ) ;
//current = prev;
2012-09-08 02:50:37 +00:00
} else { // all contexts are blocked, create a new context
// that will process posted tasks...
2013-02-07 21:08:43 +00:00
fc : : context * prev = current ;
fc : : context * next = nullptr ;
if ( pt_head ) { // grab cached context
2012-09-08 02:50:37 +00:00
next = pt_head ;
pt_head = pt_head - > next ;
next - > next = 0 ;
2014-08-28 19:42:01 +00:00
next - > reinitialize ( ) ;
2013-02-07 21:08:43 +00:00
} else { // create new context.
2012-09-08 02:50:37 +00:00
next = new fc : : context ( & thread_d : : start_process_tasks , stack_alloc ,
& fc : : thread : : current ( ) ) ;
}
2013-02-07 21:08:43 +00:00
2012-09-08 02:50:37 +00:00
current = next ;
2013-02-07 21:08:43 +00:00
if ( reschedule ) ready_push_back ( prev ) ;
// slog( "jump to %p from %p", next, prev );
2013-06-05 19:19:00 +00:00
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
2014-09-07 22:02:39 +00:00
# if BOOST_VERSION >= 105600
bc : : jump_fcontext ( & prev - > my_context , next - > my_context , ( intptr_t ) this ) ;
# elif BOOST_VERSION >= 105300
2013-06-05 19:19:00 +00:00
bc : : jump_fcontext ( prev - > my_context , next - > my_context , ( intptr_t ) this ) ;
# else
2012-09-08 02:50:37 +00:00
bc : : jump_fcontext ( & prev - > my_context , & next - > my_context , ( intptr_t ) this ) ;
2013-06-05 19:19:00 +00:00
# endif
2012-09-08 02:50:37 +00:00
BOOST_ASSERT ( current ) ;
2013-02-07 21:08:43 +00:00
BOOST_ASSERT ( current = = prev ) ;
//current = prev;
2012-09-08 02:50:37 +00:00
}
2014-08-21 18:36:29 +00:00
if ( current - > canceled ) {
2014-08-28 19:42:01 +00:00
//current->canceled = false;
# ifdef NDEBUG
FC_THROW_EXCEPTION ( canceled_exception , " " ) ;
# else
FC_THROW_EXCEPTION ( canceled_exception , " cancellation reason: ${reason} " , ( " reason " , current - > cancellation_reason ? current - > cancellation_reason : " [none given] " ) ) ;
# endif
2014-08-21 18:36:29 +00:00
}
2012-09-08 02:50:37 +00:00
return true ;
}
static void start_process_tasks ( intptr_t my ) {
thread_d * self = ( thread_d * ) my ;
try {
self - > process_tasks ( ) ;
2013-06-05 19:19:00 +00:00
} catch ( canceled_exception & ) {
// allowed exception...
2012-09-08 02:50:37 +00:00
} catch ( . . . ) {
2013-06-05 19:19:00 +00:00
elog ( " fiber ${name} exited with uncaught exception: ${e} " , ( " e " , fc : : except_str ( ) ) ( " name " , self - > name ) ) ;
// assert( !"fiber exited with uncaught exception" );
//TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<<
// boost::current_exception_diagnostic_information() <<std::endl;
2012-09-08 02:50:37 +00:00
}
self - > free_list . push_back ( self - > current ) ;
self - > start_next_fiber ( false ) ;
}
bool run_next_task ( ) {
check_for_timeouts ( ) ;
task_base * next = dequeue ( ) ;
2014-06-29 01:08:15 +00:00
2012-09-08 02:50:37 +00:00
if ( next ) {
next - > _set_active_context ( current ) ;
current - > cur_task = next ;
next - > run ( ) ;
current - > cur_task = 0 ;
next - > _set_active_context ( 0 ) ;
2012-09-09 23:44:49 +00:00
next - > release ( ) ;
2014-08-28 19:42:01 +00:00
current - > reinitialize ( ) ;
2012-09-08 02:50:37 +00:00
return true ;
}
return false ;
}
bool has_next_task ( ) {
if ( task_pqueue . size ( ) | |
( task_sch_queue . size ( ) & & task_sch_queue . front ( ) - > _when < = time_point : : now ( ) ) | |
task_in_queue . load ( boost : : memory_order_relaxed ) )
return true ;
return false ;
}
void clear_free_list ( ) {
for ( uint32_t i = 0 ; i < free_list . size ( ) ; + + i ) {
delete free_list [ i ] ;
}
free_list . clear ( ) ;
}
void process_tasks ( ) {
while ( ! done | | blocked ) {
if ( run_next_task ( ) ) continue ;
// if I have something else to do other than
// process tasks... do it.
if ( ready_head ) {
pt_push_back ( current ) ;
start_next_fiber ( false ) ;
continue ;
}
2014-06-29 01:46:10 +00:00
if ( process_canceled_tasks ( ) ) continue ;
2012-09-08 02:50:37 +00:00
clear_free_list ( ) ;
{ // lock scope
boost : : unique_lock < boost : : mutex > lock ( task_ready_mutex ) ;
if ( has_next_task ( ) ) continue ;
time_point timeout_time = check_for_timeouts ( ) ;
2012-09-11 03:13:31 +00:00
if ( done ) return ;
2013-06-05 19:19:00 +00:00
if ( timeout_time = = time_point : : maximum ( ) ) {
2012-09-08 02:50:37 +00:00
task_ready . wait ( lock ) ;
} else if ( timeout_time ! = time_point : : min ( ) ) {
2014-06-29 01:46:10 +00:00
// there may be tasks that have been canceled we should filter them out now
// rather than waiting...
2014-04-01 18:47:49 +00:00
/* This bit is kind of sloppy -- this wait was originally implemented as a wait
* with respect to boost : : chrono : : system_clock . This behaved rather comically
* if you were to do a :
* fc : : usleep ( fc : : seconds ( 60 ) ) ;
* and then set your system ' s clock back a month , it would sleep for a month
* plus a minute before waking back up ( this happened on Linux , it seems
* Windows ' behavior in this case was less unexpected ) .
*
* Boost Chrono ' s steady_clock will always increase monotonically so it will
* avoid this behavior .
*
* Right now we don ' t really have a way to distinguish when a timeout_time is coming
* from a function that takes a relative time like fc : : usleep ( ) vs something
* that takes an absolute time like fc : : promise : : wait_until ( ) , so we can ' t always
* do the right thing here .
*/
task_ready . wait_until ( lock , boost : : chrono : : steady_clock : : now ( ) +
boost : : chrono : : microseconds ( timeout_time . time_since_epoch ( ) . count ( ) - time_point : : now ( ) . time_since_epoch ( ) . count ( ) ) ) ;
2012-09-08 02:50:37 +00:00
}
}
}
}
/**
* Return system_clock : : time_point : : min ( ) if tasks have timed out
* Retunn system_clock : : time_point : : max ( ) if there are no scheduled tasks
* Return the time the next task needs to be run if there is anything scheduled .
*/
time_point check_for_timeouts ( ) {
if ( ! sleep_pqueue . size ( ) & & ! task_sch_queue . size ( ) ) {
2013-08-19 18:44:13 +00:00
//ilog( "no timeouts ready" );
2013-06-05 19:19:00 +00:00
return time_point : : maximum ( ) ;
2012-09-08 02:50:37 +00:00
}
2013-06-05 19:19:00 +00:00
time_point next = time_point : : maximum ( ) ;
2012-09-08 02:50:37 +00:00
if ( sleep_pqueue . size ( ) & & next > sleep_pqueue . front ( ) - > resume_time )
next = sleep_pqueue . front ( ) - > resume_time ;
2013-08-19 18:44:13 +00:00
if ( task_sch_queue . size ( ) & & next > task_sch_queue . front ( ) - > _when )
next = task_sch_queue . front ( ) - > _when ;
2012-09-08 02:50:37 +00:00
time_point now = time_point : : now ( ) ;
if ( now < next ) { return next ; }
// move all expired sleeping tasks to the ready queue
2013-08-19 18:44:13 +00:00
while ( sleep_pqueue . size ( ) & & sleep_pqueue . front ( ) - > resume_time < now )
{
2012-09-08 02:50:37 +00:00
fc : : context : : ptr c = sleep_pqueue . front ( ) ;
std : : pop_heap ( sleep_pqueue . begin ( ) , sleep_pqueue . end ( ) , sleep_priority_less ( ) ) ;
2013-08-19 18:44:13 +00:00
//ilog( "sleep pop back..." );
2012-09-08 02:50:37 +00:00
sleep_pqueue . pop_back ( ) ;
2013-08-19 18:44:13 +00:00
if ( c - > blocking_prom . size ( ) )
{
// ilog( "timeotu blocking prom" );
2012-09-08 02:50:37 +00:00
c - > timeout_blocking_promises ( ) ;
}
2013-08-19 18:44:13 +00:00
else
{
//ilog( "..." );
//ilog( "ready_push_front" );
2014-03-10 21:37:38 +00:00
if ( c ! = current )
ready_push_front ( c ) ;
2013-02-07 21:08:43 +00:00
}
2012-09-08 02:50:37 +00:00
}
return time_point : : min ( ) ;
}
2013-08-19 18:44:13 +00:00
void unblock ( fc : : context * c ) {
if ( fc : : thread : : current ( ) . my ! = this ) {
2014-07-27 21:37:21 +00:00
self . async ( [ = ] ( ) { unblock ( c ) ; } , " thread_d::unblock " ) ;
2013-08-19 18:44:13 +00:00
return ;
}
if ( c ! = current ) ready_push_front ( c ) ;
}
2012-09-08 02:50:37 +00:00
void yield_until ( const time_point & tp , bool reschedule ) {
check_fiber_exceptions ( ) ;
2013-12-06 05:22:06 +00:00
if ( tp < = ( time_point : : now ( ) + fc : : microseconds ( 10000 ) ) )
2013-08-19 18:44:13 +00:00
{
2012-09-08 02:50:37 +00:00
return ;
2013-08-19 18:44:13 +00:00
}
2012-09-08 02:50:37 +00:00
if ( ! current ) {
current = new fc : : context ( & fc : : thread : : current ( ) ) ;
}
current - > resume_time = tp ;
current - > clear_blocking_promises ( ) ;
sleep_pqueue . push_back ( current ) ;
std : : push_heap ( sleep_pqueue . begin ( ) ,
sleep_pqueue . end ( ) , sleep_priority_less ( ) ) ;
2013-08-19 18:44:13 +00:00
2012-09-08 02:50:37 +00:00
start_next_fiber ( reschedule ) ;
// clear current context from sleep queue...
for ( uint32_t i = 0 ; i < sleep_pqueue . size ( ) ; + + i ) {
if ( sleep_pqueue [ i ] = = current ) {
sleep_pqueue [ i ] = sleep_pqueue . back ( ) ;
sleep_pqueue . pop_back ( ) ;
std : : make_heap ( sleep_pqueue . begin ( ) ,
sleep_pqueue . end ( ) , sleep_priority_less ( ) ) ;
break ;
}
}
2013-06-05 19:19:00 +00:00
current - > resume_time = time_point : : maximum ( ) ;
2012-09-08 02:50:37 +00:00
check_fiber_exceptions ( ) ;
}
void wait ( const promise_base : : ptr & p , const time_point & timeout ) {
if ( p - > ready ( ) ) return ;
if ( timeout < time_point : : now ( ) )
2013-06-05 19:19:00 +00:00
FC_THROW_EXCEPTION ( timeout_exception , " " ) ;
2012-09-08 02:50:37 +00:00
if ( ! current ) {
current = new fc : : context ( & fc : : thread : : current ( ) ) ;
}
//slog( " %1% blocking on %2%", current, p.get() );
current - > add_blocking_promise ( p . get ( ) , true ) ;
// if not max timeout, added to sleep pqueue
2013-06-05 19:19:00 +00:00
if ( timeout ! = time_point : : maximum ( ) ) {
2012-09-08 02:50:37 +00:00
current - > resume_time = timeout ;
sleep_pqueue . push_back ( current ) ;
std : : push_heap ( sleep_pqueue . begin ( ) ,
sleep_pqueue . end ( ) ,
sleep_priority_less ( ) ) ;
}
// elog( "blocking %1%", current );
add_to_blocked ( current ) ;
// debug("swtiching fibers..." );
start_next_fiber ( ) ;
// slog( "resuming %1%", current );
//slog( " %1% unblocking blocking on %2%", current, p.get() );
current - > remove_blocking_promise ( p . get ( ) ) ;
check_fiber_exceptions ( ) ;
}
2014-08-27 16:20:19 +00:00
void cleanup_thread_specific_data ( )
{
for ( auto iter = non_task_specific_data . begin ( ) ; iter ! = non_task_specific_data . end ( ) ; + + iter )
if ( iter - > cleanup )
iter - > cleanup ( iter - > value ) ;
for ( auto iter = thread_specific_data . begin ( ) ; iter ! = thread_specific_data . end ( ) ; + + iter )
if ( iter - > cleanup )
iter - > cleanup ( iter - > value ) ;
}
2012-09-08 02:50:37 +00:00
} ;
} // namespace fc