From 130da3623e6483ba71ac15422b8127840b45dc31 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 8 Sep 2014 10:31:07 -0400 Subject: [PATCH] Allow fibers to be canceled while they're sleeping or blocked on promises --- include/fc/thread/thread.hpp | 5 ++++- src/thread/task.cpp | 1 + src/thread/thread.cpp | 5 +++++ src/thread/thread_d.hpp | 38 ++++++++++++++++++++++++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index b7060d0..0f69900 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -128,6 +128,7 @@ namespace fc { private: thread( class thread_d* ); friend class promise_base; + friend class task_base; friend class thread_d; friend class mutex; friend void* detail::get_thread_specific_data(unsigned slot); @@ -154,8 +155,10 @@ namespace fc { void async_task( task_base* t, const priority& p ); void async_task( task_base* t, const priority& p, const time_point& tp ); - class thread_d* my; + void notify_task_has_been_canceled(); + + class thread_d* my; }; /** diff --git a/src/thread/task.cpp b/src/thread/task.cpp index 4d9acfc..9f6b6f0 100644 --- a/src/thread/task.cpp +++ b/src/thread/task.cpp @@ -69,6 +69,7 @@ namespace fc { #ifndef NDEBUG _active_context->cancellation_reason = reason; #endif + _active_context->ctx_thread->notify_task_has_been_canceled(); } } diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index e9b8efb..ea12d24 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -418,6 +418,11 @@ namespace fc { return this == ¤t(); } + void thread::notify_task_has_been_canceled() + { + async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); + } + #ifdef _MSC_VER /* support for providing a structured exception handler for async tasks */ namespace detail diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 3fa64d8..9af2762 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -606,5 +606,43 @@ namespace fc { iter->cleanup(iter->value); } + void notify_task_has_been_canceled() + { + for (fc::context** iter = &blocked; *iter;) + { + if ((*iter)->canceled) + { + fc::context* next_blocked = (*iter)->next_blocked; + (*iter)->next_blocked = nullptr; + ready_push_front(*iter); + *iter = next_blocked; + continue; + } + iter = &(*iter)->next_blocked; + } + + bool task_removed_from_sleep_pqueue = false; + for (auto sleep_iter = sleep_pqueue.begin(); sleep_iter != sleep_pqueue.end();) + { + if ((*sleep_iter)->canceled) + { + bool already_on_ready_list = false; + for (fc::context* ready_iter = ready_head; ready_iter; ready_iter = ready_iter->next) + if (ready_iter == *sleep_iter) + { + already_on_ready_list = true; + break; + } + if (!already_on_ready_list) + ready_push_front(*sleep_iter); + sleep_iter = sleep_pqueue.erase(sleep_iter); + task_removed_from_sleep_pqueue = true; + } + else + ++sleep_iter; + } + if (task_removed_from_sleep_pqueue) + std::make_heap(sleep_pqueue.begin(), sleep_pqueue.end(), sleep_priority_less()); + } }; } // namespace fc