This commit is contained in:
dnotestein 2014-09-08 10:47:16 -04:00
commit 1360eabd8c
11 changed files with 167 additions and 49 deletions

View file

@ -1,9 +1,19 @@
#pragma once
#if defined(_MSC_VER) && _MSC_VER >= 1400
#pragma warning(push)
#pragma warning(disable:4996)
#endif
#include <boost/signals2/signal.hpp>
#if defined(_MSC_VER) && _MSC_VER >= 1400
#pragma warning(pop)
#endif
#include <fc/thread/future.hpp>
#include <fc/thread/thread.hpp>
namespace fc {
#if !defined(BOOST_NO_TEMPLATE_ALIASES)
template<typename T>
@ -37,4 +47,3 @@ namespace fc {
p->wait( timeout_us );
}
}

View file

@ -1,7 +1,4 @@
#pragma once
namespace boost {
template<typename T> class atomic;
}
namespace fc {
class microseconds;

View file

@ -1,9 +1,5 @@
#pragma once
namespace boost {
template<typename T> class atomic;
}
namespace fc {
class microseconds;
class time_point;

View file

@ -128,6 +128,7 @@ namespace fc {
private:
thread( class thread_d* );
friend class promise_base;
friend class task_base;
friend class thread_d;
friend class mutex;
friend void* detail::get_thread_specific_data(unsigned slot);
@ -154,8 +155,10 @@ namespace fc {
void async_task( task_base* t, const priority& p );
void async_task( task_base* t, const priority& p, const time_point& tp );
class thread_d* my;
void notify_task_has_been_canceled();
class thread_d* my;
};
/**

View file

@ -44,8 +44,9 @@ namespace fc
}
template<typename T>
void skip_white_space( T& in )
bool skip_white_space( T& in )
{
bool skipped = false;
while( true )
{
switch( in.peek() )
@ -54,10 +55,11 @@ namespace fc
case '\t':
case '\n':
case '\r':
skipped = true;
in.get();
break;
default:
return;
return skipped;
}
}
}
@ -162,8 +164,9 @@ namespace fc
if( in.peek() == ',' )
{
in.get();
continue;
}
skip_white_space(in);
if( skip_white_space(in) ) continue;
string key = stringFromStream( in );
skip_white_space(in);
if( in.peek() != ':' )
@ -207,8 +210,12 @@ namespace fc
while( in.peek() != ']' )
{
while( in.peek() == ',' )
if( in.peek() == ',' )
{
in.get();
continue;
}
if( skip_white_space(in) ) continue;
ar.push_back( variant_from_stream(in) );
skip_white_space(in);
}
@ -400,11 +407,8 @@ namespace fc
case EOF:
FC_THROW_EXCEPTION( eof_exception, "unexpected end of file" );
default:
// ilog( "unhandled char '${c}' int ${int}", ("c", fc::string( &c, 1 ) )("int", int(c)) );
return stringFromToken(in);
in.get(); //
ilog( "unhandled char '${c}' int ${int}", ("c", fc::string( &c, 1 ) )("int", int(c)) );
return variant();
FC_THROW_EXCEPTION( parse_error_exception, "Unexpected char '${c}' in \"${s}\"",
("c", c)("s", stringFromToken(in)) );
}
}
return variant();

View file

@ -7,10 +7,18 @@
#include <boost/version.hpp>
#if BOOST_VERSION >= 105400
#include <boost/coroutine/stack_context.hpp>
#include <boost/coroutine/stack_allocator.hpp>
# include <boost/coroutine/stack_context.hpp>
namespace bc = boost::context;
namespace bco = boost::coroutines;
# if BOOST_VERSION >= 105600 && !defined(NDEBUG)
# include <boost/assert.hpp>
# include <boost/coroutine/protected_stack_allocator.hpp>
typedef bco::protected_stack_allocator stack_allocator;
# else
# include <boost/coroutine/stack_allocator.hpp>
typedef bco::stack_allocator stack_allocator;
# endif
#elif BOOST_VERSION >= 105300
#include <boost/coroutine/stack_allocator.hpp>
namespace bc = boost::context;
@ -35,12 +43,12 @@ namespace fc {
struct context {
typedef fc::context* ptr;
#if BOOST_VERSION >= 105400
#if BOOST_VERSION >= 105400
bco::stack_context stack_ctx;
#endif
#endif
context( void (*sf)(intptr_t), bco::stack_allocator& alloc, fc::thread* t )
context( void (*sf)(intptr_t), stack_allocator& alloc, fc::thread* t )
: caller_context(0),
stack_alloc(&alloc),
next_blocked(0),
@ -54,7 +62,11 @@ namespace fc {
complete(false),
cur_task(0)
{
#if BOOST_VERSION >= 105400
#if BOOST_VERSION >= 105600
size_t stack_size = stack_allocator::traits_type::default_size() * 4;
alloc.allocate(stack_ctx, stack_size);
my_context = bc::make_fcontext( stack_ctx.sp, stack_ctx.size, sf);
#elif BOOST_VERSION >= 105400
size_t stack_size = bco::stack_allocator::default_stacksize() * 4;
alloc.allocate(stack_ctx, stack_size);
my_context = bc::make_fcontext( stack_ctx.sp, stack_ctx.size, sf);
@ -71,7 +83,9 @@ namespace fc {
}
context( fc::thread* t) :
#if BOOST_VERSION >= 105300
#if BOOST_VERSION >= 105600
my_context(nullptr),
#elif BOOST_VERSION >= 105300
my_context(new bc::fcontext_t),
#endif
caller_context(0),
@ -89,20 +103,17 @@ namespace fc {
{}
~context() {
#if BOOST_VERSION >= 105400
#if BOOST_VERSION >= 105600
if(stack_alloc)
stack_alloc->deallocate( stack_ctx );
#elif BOOST_VERSION >= 105400
if(stack_alloc)
stack_alloc->deallocate( stack_ctx );
else
delete my_context;
#elif BOOST_VERSION >= 105400
if(stack_alloc)
stack_alloc->deallocate( my_context->fc_stack.sp, bco::stack_allocator::default_stacksize() * 4 );
else
delete my_context;
#elif BOOST_VERSION >= 105300
if(stack_alloc)
stack_alloc->deallocate( my_context->fc_stack.sp, bco::stack_allocator::default_stacksize() * 4);
stack_alloc->deallocate( my_context->fc_stack.sp, stack_allocator::default_stacksize() * 4);
else
delete my_context;
#else
@ -196,13 +207,13 @@ namespace fc {
#if BOOST_VERSION >= 105300
#if BOOST_VERSION >= 105300 && BOOST_VERSION < 105600
bc::fcontext_t* my_context;
#else
bc::fcontext_t my_context;
#endif
fc::context* caller_context;
bco::stack_allocator* stack_alloc;
stack_allocator* stack_alloc;
priority prio;
//promise_base* prom;
std::vector<blocked_promise> blocking_prom;

View file

@ -69,6 +69,7 @@ namespace fc {
#ifndef NDEBUG
_active_context->cancellation_reason = reason;
#endif
_active_context->ctx_thread->notify_task_has_been_canceled();
}
}

View file

@ -418,6 +418,11 @@ namespace fc {
return this == &current();
}
void thread::notify_task_has_been_canceled()
{
async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() );
}
#ifdef _MSC_VER
/* support for providing a structured exception handler for async tasks */
namespace detail

View file

@ -68,26 +68,26 @@ namespace fc {
}
fc::thread& self;
boost::thread* boost_thread;
bco::stack_allocator stack_alloc;
stack_allocator stack_alloc;
boost::condition_variable task_ready;
boost::mutex task_ready_mutex;
boost::atomic<task_base*> task_in_queue;
std::vector<task_base*> task_pqueue;
std::vector<task_base*> task_sch_queue;
std::vector<fc::context*> sleep_pqueue;
std::vector<fc::context*> free_list;
std::vector<task_base*> task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time
std::vector<task_base*> task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run
std::vector<fc::context*> sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume
std::vector<fc::context*> free_list; // list of unused contexts that are ready for deletion
bool done;
fc::string name;
fc::context* current;
fc::context* current; // the currently-executing task in this thread
fc::context* pt_head;
fc::context* pt_head; // list of contexts that can be reused for new tasks
fc::context* ready_head;
fc::context* ready_head; // linked list (using 'next') of contexts that are ready to run
fc::context* ready_tail;
fc::context* blocked;
fc::context* blocked; // linked list of contexts (using 'next_blocked') blocked on promises via wait()
// values for thread specific data objects for this thread
std::vector<detail::specific_data_info> thread_specific_data;
@ -321,8 +321,10 @@ namespace fc {
current = next;
if( reschedule ) ready_push_back(prev);
// slog( "jump to %p from %p", next, prev );
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
#if BOOST_VERSION >= 105300
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
#if BOOST_VERSION >= 105600
bc::jump_fcontext( &prev->my_context, next->my_context, 0 );
#elif BOOST_VERSION >= 105300
bc::jump_fcontext( prev->my_context, next->my_context, 0 );
#else
bc::jump_fcontext( &prev->my_context, &next->my_context, 0 );
@ -350,7 +352,9 @@ namespace fc {
// slog( "jump to %p from %p", next, prev );
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
#if BOOST_VERSION >= 105300
#if BOOST_VERSION >= 105600
bc::jump_fcontext( &prev->my_context, next->my_context, (intptr_t)this );
#elif BOOST_VERSION >= 105300
bc::jump_fcontext( prev->my_context, next->my_context, (intptr_t)this );
#else
bc::jump_fcontext( &prev->my_context, &next->my_context, (intptr_t)this );
@ -602,5 +606,43 @@ namespace fc {
iter->cleanup(iter->value);
}
void notify_task_has_been_canceled()
{
for (fc::context** iter = &blocked; *iter;)
{
if ((*iter)->canceled)
{
fc::context* next_blocked = (*iter)->next_blocked;
(*iter)->next_blocked = nullptr;
ready_push_front(*iter);
*iter = next_blocked;
continue;
}
iter = &(*iter)->next_blocked;
}
bool task_removed_from_sleep_pqueue = false;
for (auto sleep_iter = sleep_pqueue.begin(); sleep_iter != sleep_pqueue.end();)
{
if ((*sleep_iter)->canceled)
{
bool already_on_ready_list = false;
for (fc::context* ready_iter = ready_head; ready_iter; ready_iter = ready_iter->next)
if (ready_iter == *sleep_iter)
{
already_on_ready_list = true;
break;
}
if (!already_on_ready_list)
ready_push_front(*sleep_iter);
sleep_iter = sleep_pqueue.erase(sleep_iter);
task_removed_from_sleep_pqueue = true;
}
else
++sleep_iter;
}
if (task_removed_from_sleep_pqueue)
std::make_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less());
}
};
} // namespace fc

View file

@ -535,6 +535,7 @@ void from_variant( const variant& var, uint32_t& vo )
vo = static_cast<uint32_t>(var.as_uint64());
}
void to_variant( const int32_t& var, variant& vo ) { vo = int64_t(var); }
void from_variant( const variant& var, int32_t& vo )
{
vo = static_cast<int32_t>(var.as_int64());

View file

@ -93,7 +93,7 @@ BOOST_AUTO_TEST_CASE( test_non_preemptable_assertion )
BOOST_TEST_PASSPOINT();
}
BOOST_AUTO_TEST_CASE( cancel_an_active_task )
BOOST_AUTO_TEST_CASE( cancel_an_sleeping_task )
{
enum task_result{sleep_completed, sleep_aborted};
fc::future<task_result> task = fc::async([]() {
@ -128,6 +128,55 @@ BOOST_AUTO_TEST_CASE( cancel_an_active_task )
}
}
BOOST_AUTO_TEST_CASE( cancel_a_task_waiting_on_promise )
{
enum task_result{task_completed, task_aborted};
fc::promise<void>::ptr promise_to_wait_on(new fc::promise<void>());
fc::future<task_result> task = fc::async([promise_to_wait_on]() {
BOOST_TEST_MESSAGE("Starting async task");
try
{
promise_to_wait_on->wait_until(fc::time_point::now() + fc::seconds(5));
return task_completed;
}
catch (const fc::canceled_exception&)
{
BOOST_TEST_MESSAGE("Caught canceled_exception inside task-to-be-canceled");
throw;
}
catch (const fc::timeout_exception&)
{
BOOST_TEST_MESSAGE("Caught timeout_exception inside task-to-be-canceled");
throw;
}
catch (const fc::exception& e)
{
BOOST_TEST_MESSAGE("Caught unexpected exception inside task-to-be-canceled: " << e.to_detail_string());
return task_aborted;
}
}, "test_task");
fc::time_point start_time = fc::time_point::now();
// wait a bit for the task to start running
fc::usleep(fc::milliseconds(100));
BOOST_TEST_MESSAGE("Canceling task");
task.cancel("canceling to test if cancel works");
//promise_to_wait_on->set_value();
try
{
task_result result = task.wait();
BOOST_CHECK_MESSAGE(result != task_completed, "task should have been canceled");
}
catch (fc::exception& e)
{
BOOST_TEST_MESSAGE("Caught exception from canceled task: " << e.what());
BOOST_CHECK_MESSAGE(fc::time_point::now() - start_time < fc::seconds(4), "Task was not canceled quickly");
}
}
BOOST_AUTO_TEST_CASE( cleanup_cancelled_task )
{