canceled scheduled tasks return immediately when waited upon
This commit is contained in:
parent
ffa617183e
commit
892df5c0b7
3 changed files with 37 additions and 23 deletions
|
|
@ -226,6 +226,9 @@ target_link_libraries( fc PUBLIC udt easylzma_static ${Boost_LIBRARIES} ${OPENSS
|
|||
add_executable( ntp_test ntp_test.cpp )
|
||||
target_link_libraries( ntp_test fc )
|
||||
|
||||
add_executable( task_cancel_test tests/task_cancel.cpp )
|
||||
target_link_libraries( task_cancel_test fc )
|
||||
|
||||
#include_directories( vendor/udt4/src )
|
||||
|
||||
add_executable( udt_server tests/udts.cpp )
|
||||
|
|
|
|||
|
|
@ -200,7 +200,7 @@ namespace fc {
|
|||
void thread::exec() {
|
||||
if( !my->current ) my->current = new fc::context(&fc::thread::current());
|
||||
try {
|
||||
my->process_tasks();
|
||||
my->process_tasks();
|
||||
}
|
||||
catch( canceled_exception& )
|
||||
{
|
||||
|
|
@ -225,35 +225,15 @@ namespace fc {
|
|||
my->start_next_fiber(reschedule);
|
||||
my->check_fiber_exceptions();
|
||||
}
|
||||
|
||||
void thread::sleep_until( const time_point& tp ) {
|
||||
//ilog( "sleep until ${tp} wait: ${delta}", ("tp",tp)("delta",(tp-fc::time_point::now()).count()) );
|
||||
|
||||
if( tp <= (time_point::now()+fc::microseconds(10000)) )
|
||||
{
|
||||
this->yield(true);
|
||||
}
|
||||
my->yield_until( tp, false );
|
||||
/*
|
||||
my->check_fiber_exceptions();
|
||||
|
||||
BOOST_ASSERT( ¤t() == this );
|
||||
if( !my->current ) {
|
||||
my->current = new fc::context(&fc::thread::current());
|
||||
}
|
||||
|
||||
my->current->resume_time = tp;
|
||||
my->current->clear_blocking_promises();
|
||||
|
||||
my->sleep_pqueue.push_back(my->current);
|
||||
std::push_heap( my->sleep_pqueue.begin(),
|
||||
my->sleep_pqueue.end(), sleep_priority_less() );
|
||||
|
||||
my->start_next_fiber();
|
||||
my->current->resume_time = time_point::maximum();
|
||||
|
||||
my->check_fiber_exceptions();
|
||||
*/
|
||||
}
|
||||
|
||||
int thread::wait_any_until( std::vector<promise_base::ptr>&& p, const time_point& timeout) {
|
||||
for( size_t i = 0; i < p.size(); ++i ) {
|
||||
if( p[i]->ready() ) return i;
|
||||
|
|
|
|||
|
|
@ -239,6 +239,31 @@ namespace fc {
|
|||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
bool process_canceled_tasks()
|
||||
{
|
||||
bool canceled_task = false;
|
||||
for( auto task_itr = task_sch_queue.begin();
|
||||
task_itr != task_sch_queue.end();
|
||||
)
|
||||
{
|
||||
if( (*task_itr)->canceled() )
|
||||
{
|
||||
(*task_itr)->run();
|
||||
(*task_itr)->release();
|
||||
*task_itr = task_sch_queue.back();
|
||||
task_sch_queue.pop_back();
|
||||
canceled_task = true;
|
||||
continue;
|
||||
}
|
||||
++task_itr;
|
||||
}
|
||||
|
||||
if( canceled_task )
|
||||
std::make_heap( task_sch_queue.begin(), task_sch_queue.end(), task_when_less() );
|
||||
|
||||
return canceled_task;
|
||||
}
|
||||
|
||||
/**
|
||||
* This should be before or after a context switch to
|
||||
|
|
@ -378,6 +403,8 @@ namespace fc {
|
|||
continue;
|
||||
}
|
||||
|
||||
if( process_canceled_tasks() ) continue;
|
||||
|
||||
clear_free_list();
|
||||
|
||||
{ // lock scope
|
||||
|
|
@ -389,6 +416,10 @@ namespace fc {
|
|||
if( timeout_time == time_point::maximum() ) {
|
||||
task_ready.wait( lock );
|
||||
} else if( timeout_time != time_point::min() ) {
|
||||
// there may be tasks that have been canceled we should filter them out now
|
||||
// rather than waiting...
|
||||
|
||||
|
||||
/* This bit is kind of sloppy -- this wait was originally implemented as a wait
|
||||
* with respect to boost::chrono::system_clock. This behaved rather comically
|
||||
* if you were to do a:
|
||||
|
|
|
|||
Loading…
Reference in a new issue