From 27096f15a797c7551fbeb9f9ef7df99cc2ce19a5 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Sun, 24 Aug 2014 18:33:05 -0400 Subject: [PATCH 1/6] Log the names of tasks to the log file (now contains thread_name:task_name} --- include/fc/log/log_message.hpp | 1 + include/fc/thread/thread.hpp | 2 ++ src/log/file_appender.cpp | 2 +- src/log/log_message.cpp | 6 ++++++ src/thread/thread.cpp | 8 ++++++++ 5 files changed, 18 insertions(+), 1 deletion(-) diff --git a/include/fc/log/log_message.hpp b/include/fc/log/log_message.hpp index 411c213..1928a98 100644 --- a/include/fc/log/log_message.hpp +++ b/include/fc/log/log_message.hpp @@ -68,6 +68,7 @@ namespace fc uint64_t get_line_number()const; string get_method()const; string get_thread_name()const; + string get_task_name()const; string get_host_name()const; time_point get_timestamp()const; log_level get_log_level()const; diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index 1b99b80..10fd05b 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -32,6 +32,8 @@ namespace fc { */ void set_name( const string& n ); + const char* current_task_desc() const; + /** * @brief print debug info about the state of every context / promise. * diff --git a/src/log/file_appender.cpp b/src/log/file_appender.cpp index 014fd09..92c2848 100644 --- a/src/log/file_appender.cpp +++ b/src/log/file_appender.cpp @@ -192,7 +192,7 @@ namespace fc { std::stringstream line; //line << (m.get_context().get_timestamp().time_since_epoch().count() % (1000ll*1000ll*60ll*60))/1000 <<"ms "; line << std::string(m.get_context().get_timestamp()) << " "; - line << std::setw( 10 ) << m.get_context().get_thread_name().substr(0,9).c_str() <<" "; + line << std::setw( 21 ) << (m.get_context().get_thread_name().substr(0,9) + string(":") + m.get_context().get_task_name()).c_str() <<" "; auto me = m.get_context().get_method(); // strip all leading scopes... diff --git a/src/log/log_message.cpp b/src/log/log_message.cpp index 696099a..7e26cf4 100644 --- a/src/log/log_message.cpp +++ b/src/log/log_message.cpp @@ -20,6 +20,7 @@ namespace fc uint64_t line; string method; string thread_name; + string task_name; string hostname; string context; time_point timestamp; @@ -53,6 +54,8 @@ namespace fc my->method = method; my->timestamp = time_point::now(); my->thread_name = fc::thread::current().name(); + const char* current_task_desc = fc::thread::current().current_task_desc(); + my->task_name = current_task_desc ? current_task_desc : "?unnamed?"; } log_context::log_context( const variant& v ) @@ -65,6 +68,8 @@ namespace fc my->method = obj["method"].as_string(); my->hostname = obj["hostname"].as_string(); my->thread_name = obj["thread_name"].as_string(); + if (obj.contains("task_name")) + my->task_name = obj["task_name"].as_string(); my->timestamp = obj["timestamp"].as(); if( obj.contains( "context" ) ) my->context = obj["context"].as(); @@ -149,6 +154,7 @@ namespace fc uint64_t log_context::get_line_number()const { return my->line; } string log_context::get_method()const { return my->method; } string log_context::get_thread_name()const { return my->thread_name; } + string log_context::get_task_name()const { return my->task_name; } string log_context::get_host_name()const { return my->hostname; } time_point log_context::get_timestamp()const { return my->timestamp; } log_level log_context::get_log_level()const{ return my->level; } diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index dfa1b87..bfd13fa 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -123,6 +123,14 @@ namespace fc { } const string& thread::name()const { return my->name; } void thread::set_name( const fc::string& n ) { my->name = n; } + + const char* thread::current_task_desc() const + { + if (my->current && my->current->cur_task) + return my->current->cur_task->get_desc(); + return NULL; + } + void thread::debug( const fc::string& d ) { /*my->debug(d);*/ } void thread::quit() { From d188b138d6f439904b04b89b4b9812fde17ffe01 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 25 Aug 2014 18:43:12 -0400 Subject: [PATCH 2/6] Allow fc::canceled_exception to pass through places where we were catching and ignoring all fc::exceptions --- src/network/ntp.cpp | 4 ++++ src/thread/future.cpp | 9 ++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index aae4f5c..9febe47 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -60,6 +60,10 @@ namespace fc break; } } + catch (const fc::canceled_exception&) + { + throw; + } // this could fail to resolve but we want to go on to other hosts.. catch ( const fc::exception& e ) { diff --git a/src/thread/future.cpp b/src/thread/future.cpp index 0ea1582..9ccc47f 100644 --- a/src/thread/future.cpp +++ b/src/thread/future.cpp @@ -44,13 +44,16 @@ namespace fc { } void promise_base::_wait( const microseconds& timeout_us ){ - if( timeout_us == microseconds::maximum() ) _wait_until( time_point::maximum() ); - else _wait_until( time_point::now() + timeout_us ); + if( timeout_us == microseconds::maximum() ) + _wait_until( time_point::maximum() ); + else + _wait_until( time_point::now() + timeout_us ); } void promise_base::_wait_until( const time_point& timeout_us ){ { synchronized(_spin_yield) if( _ready ) { - if( _exceptp ) _exceptp->dynamic_rethrow_exception(); + if( _exceptp ) + _exceptp->dynamic_rethrow_exception(); return; } _enqueue_thread(); From 976bbce66864546fa5b8cd5bc4824b3b565f81e1 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 25 Aug 2014 18:44:15 -0400 Subject: [PATCH 3/6] When locking a mutex, ensure the task has a context before attempting to lock. --- src/thread/mutex.cpp | 53 +++++++++++++++++++++++++------------------ tests/task_cancel.cpp | 43 ++++++++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/src/thread/mutex.cpp b/src/thread/mutex.cpp index 40e2039..8119ae7 100644 --- a/src/thread/mutex.cpp +++ b/src/thread/mutex.cpp @@ -23,19 +23,22 @@ namespace fc { } /** - * @param next - is set to the next context to get the lock. + * @param last_context - is set to the next context to get the lock (the next-to-last element of the list) * @return the last context (the one with the lock) */ - static fc::context* get_tail( fc::context* h, fc::context*& next ) { - next = 0; - fc::context* n = h; - if( !n ) return n; - while( n->next_blocked_mutex ) { - next = n; - n=n->next_blocked_mutex; + static fc::context* get_tail( fc::context* list_head, fc::context*& context_to_unblock ) { + context_to_unblock = 0; + fc::context* list_context_iter = list_head; + if( !list_context_iter ) + return list_context_iter; + while( list_context_iter->next_blocked_mutex ) + { + context_to_unblock = list_context_iter; + list_context_iter = list_context_iter->next_blocked_mutex; } - return n; + return list_context_iter; } + static fc::context* remove( fc::context* head, fc::context* target ) { fc::context* c = head; fc::context* p = 0; @@ -114,17 +117,21 @@ namespace fc { } void mutex::lock() { - fc::context* n = 0; - fc::context* cc = fc::thread::current().my->current; + fc::context* current_context = fc::thread::current().my->current; + if( !current_context ) + current_context = fc::thread::current().my->current = new fc::context( &fc::thread::current() ); + { fc::unique_lock lock(m_blist_lock); if( !m_blist ) { - m_blist = cc; + m_blist = current_context; + assert(!current_context->next_blocked_mutex); return; } // allow recusive locks - if ( get_tail( m_blist, n ) == cc ) { + fc::context* dummy_context_to_unblock = 0; + if ( get_tail( m_blist, dummy_context_to_unblock ) == current_context ) { assert(false); // EMF: I think recursive locks are currently broken -- we need to // keep track of how many times this mutex has been locked by the @@ -132,9 +139,10 @@ namespace fc { // the next context only if the count drops to zero return; } - cc->next_blocked_mutex = m_blist; - m_blist = cc; + current_context->next_blocked_mutex = m_blist; + m_blist = current_context; +#if 0 int cnt = 0; auto i = m_blist; while( i ) { @@ -142,25 +150,26 @@ namespace fc { ++cnt; } //wlog( "wait queue len %1%", cnt ); +#endif } try { fc::thread::current().yield(false); - BOOST_ASSERT( cc->next_blocked_mutex == 0 ); + BOOST_ASSERT( current_context->next_blocked_mutex == 0 ); } catch ( ... ) { wlog( "lock threw" ); - cleanup( *this, m_blist_lock, m_blist, cc); + cleanup( *this, m_blist_lock, m_blist, current_context); throw; } } void mutex::unlock() { - fc::context* next = 0; + fc::context* context_to_unblock = 0; { fc::unique_lock lock(m_blist_lock); - get_tail(m_blist, next); - if( next ) { - next->next_blocked_mutex = 0; - next->ctx_thread->my->unblock( next ); + get_tail(m_blist, context_to_unblock); + if( context_to_unblock ) { + context_to_unblock->next_blocked_mutex = 0; + context_to_unblock->ctx_thread->my->unblock( context_to_unblock ); } else { m_blist = 0; } diff --git a/tests/task_cancel.cpp b/tests/task_cancel.cpp index 32f9fec..0d6500f 100644 --- a/tests/task_cancel.cpp +++ b/tests/task_cancel.cpp @@ -12,13 +12,54 @@ BOOST_AUTO_TEST_CASE( leave_mutex_locked ) { { fc::mutex test_mutex; - fc::future test_task = fc::async([&](){ fc::scoped_lock test_lock(test_mutex); for (int i = 0; i < 10; ++i) fc::usleep(fc::seconds(1));}); + fc::future test_task = fc::async([&](){ + fc::scoped_lock test_lock(test_mutex); + for (int i = 0; i < 10; ++i) + fc::usleep(fc::seconds(1)); + }, "test_task"); fc::usleep(fc::seconds(3)); test_task.cancel_and_wait(); } BOOST_TEST_PASSPOINT(); } +BOOST_AUTO_TEST_CASE( cancel_task_blocked_on_mutex) +{ + { + fc::mutex test_mutex; + fc::future test_task; + { + fc::scoped_lock test_lock(test_mutex); + test_task = fc::async([&test_mutex](){ + BOOST_TEST_MESSAGE("--- In test_task, locking mutex"); + fc::scoped_lock async_task_test_lock(test_mutex); + BOOST_TEST_MESSAGE("--- In test_task, mutex locked, commencing sleep"); + for (int i = 0; i < 10; ++i) + fc::usleep(fc::seconds(1)); + BOOST_TEST_MESSAGE("--- In test_task, sleeps done, exiting"); + }, "test_task"); + fc::usleep(fc::seconds(3)); + test_task.cancel(); + try + { + test_task.wait(fc::seconds(1)); + BOOST_ERROR("test should have been canceled, not exited cleanly"); + } + catch (const fc::canceled_exception&) + { + BOOST_TEST_PASSPOINT(); + } + catch (const fc::timeout_exception&) + { + BOOST_ERROR("unable to cancel task blocked on mutex"); + } + BOOST_TEST_MESSAGE("Unlocking mutex locked from the main task so the test task will have the opportunity to lock it and be canceled"); + } + test_task.cancel_and_wait(); + } +} + + BOOST_AUTO_TEST_CASE( test_non_preemptable_assertion ) { return; // this isn't a real test, because the thing it tries to test works by asserting, not by throwing From d9e6a9e56846fa05c428c7f3eddb6df1e604ea91 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Wed, 27 Aug 2014 11:55:14 -0400 Subject: [PATCH 4/6] When a task is canceled while blocking on a mutex, fix the code that removes it from the mutex's block list to null out its "next" pointer, which is assumed to be null whenever not blocked on a mutex --- src/thread/mutex.cpp | 56 ++++++++++++++++++++++++++++++------------- tests/task_cancel.cpp | 4 +++- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/thread/mutex.cpp b/src/thread/mutex.cpp index 8119ae7..6ea85af 100644 --- a/src/thread/mutex.cpp +++ b/src/thread/mutex.cpp @@ -11,15 +11,18 @@ namespace fc { :m_blist(0){} mutex::~mutex() { - if( m_blist ) { - auto c = m_blist; + if( m_blist ) + { + context* c = m_blist; fc::thread::current().debug("~mutex"); +#if 0 while( c ) { // elog( "still blocking on context %p (%s)", m_blist, (m_blist->cur_task ? m_blist->cur_task->get_desc() : "no current task") ); c = c->next_blocked_mutex; } +#endif + BOOST_ASSERT( false && "Attempt to free mutex while others are blocking on lock." ); } - BOOST_ASSERT( !m_blist && "Attempt to free mutex while others are blocking on lock." ); } /** @@ -40,18 +43,21 @@ namespace fc { } static fc::context* remove( fc::context* head, fc::context* target ) { - fc::context* c = head; - fc::context* p = 0; - while( c ) { - if( c == target ) { - if( p ) { - p->next_blocked_mutex = c->next_blocked_mutex; + fc::context* context_iter = head; + fc::context* previous = 0; + while( context_iter ) + { + if( context_iter == target ) + { + if( previous ) + { + previous->next_blocked_mutex = context_iter->next_blocked_mutex; return head; } - return c->next_blocked_mutex; + return context_iter->next_blocked_mutex; } - p = c; - c = c->next_blocked_mutex; + previous = context_iter; + context_iter = context_iter->next_blocked_mutex; } return head; } @@ -59,7 +65,8 @@ namespace fc { { fc::unique_lock lock(syl); if( cc->next_blocked_mutex ) { - bl = remove(bl, cc ); + bl = remove(bl, cc); + cc->next_blocked_mutex = nullptr; return; } } @@ -123,7 +130,9 @@ namespace fc { { fc::unique_lock lock(m_blist_lock); - if( !m_blist ) { + if( !m_blist ) + { + // nobody else owns the mutex, so we get it; add our context as the last and only element on the mutex's list m_blist = current_context; assert(!current_context->next_blocked_mutex); return; @@ -132,6 +141,8 @@ namespace fc { // allow recusive locks fc::context* dummy_context_to_unblock = 0; if ( get_tail( m_blist, dummy_context_to_unblock ) == current_context ) { + // if we already have the lock (meaning we're on the tail of the list) then + // we shouldn't be trying to grab the lock again assert(false); // EMF: I think recursive locks are currently broken -- we need to // keep track of how many times this mutex has been locked by the @@ -139,6 +150,7 @@ namespace fc { // the next context only if the count drops to zero return; } + // add ourselves to the head of the list current_context->next_blocked_mutex = m_blist; m_blist = current_context; @@ -153,11 +165,21 @@ namespace fc { #endif } - try { + try + { fc::thread::current().yield(false); + // if yield() returned normally, we should now own the lock (we should be at the tail of the list) BOOST_ASSERT( current_context->next_blocked_mutex == 0 ); - } catch ( ... ) { - wlog( "lock threw" ); + } + catch ( exception& e ) + { + wlog( "lock threw: ${e}", ("e", e)); + cleanup( *this, m_blist_lock, m_blist, current_context); + FC_RETHROW_EXCEPTION(e, warn, "lock threw: ${e}", ("e", e)); + } + catch ( ... ) + { + wlog( "lock threw unexpected exception" ); cleanup( *this, m_blist_lock, m_blist, current_context); throw; } diff --git a/tests/task_cancel.cpp b/tests/task_cancel.cpp index 0d6500f..0b9408e 100644 --- a/tests/task_cancel.cpp +++ b/tests/task_cancel.cpp @@ -39,7 +39,7 @@ BOOST_AUTO_TEST_CASE( cancel_task_blocked_on_mutex) BOOST_TEST_MESSAGE("--- In test_task, sleeps done, exiting"); }, "test_task"); fc::usleep(fc::seconds(3)); - test_task.cancel(); + //test_task.cancel(); try { test_task.wait(fc::seconds(1)); @@ -55,6 +55,8 @@ BOOST_AUTO_TEST_CASE( cancel_task_blocked_on_mutex) } BOOST_TEST_MESSAGE("Unlocking mutex locked from the main task so the test task will have the opportunity to lock it and be canceled"); } + fc::usleep(fc::seconds(3)); + test_task.cancel_and_wait(); } } From 8841f5e2719a24d249ea387f905dcbe56488a001 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Wed, 27 Aug 2014 12:20:19 -0400 Subject: [PATCH 5/6] Import thread/task_specific variables --- CMakeLists.txt | 1 + include/fc/thread/task.hpp | 27 +++++++++ include/fc/thread/thread.hpp | 14 +++++ include/fc/thread/thread_specific.hpp | 84 +++++++++++++++++++++++++++ src/thread/task.cpp | 15 +++++ src/thread/thread.cpp | 1 + src/thread/thread_d.hpp | 24 +++++++- src/thread/thread_specific.cpp | 65 +++++++++++++++++++++ 8 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 include/fc/thread/thread_specific.hpp create mode 100644 src/thread/thread_specific.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b37c4df..7a93881 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,6 +96,7 @@ set( fc_sources src/exception.cpp src/variant_object.cpp src/thread/thread.cpp + src/thread/thread_specific.cpp src/thread/future.cpp src/thread/task.cpp src/thread/spin_lock.cpp diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index cef9dcd..7517cd1 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -8,6 +8,25 @@ namespace fc { struct context; class spin_lock; + namespace detail + { + struct specific_data_info + { + void* value; + void (*cleanup)(void*); + specific_data_info() : + value(0), + cleanup(0) + {} + specific_data_info(void* value, void (*cleanup)(void*)) : + value(value), + cleanup(cleanup) + {} + }; + void* get_task_specific_data(unsigned slot); + void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); + } + class task_base : virtual public promise_base { public: void run(); @@ -23,6 +42,12 @@ namespace fc { context* _active_context; task_base* _next; + // support for task-specific data + std::vector *_task_specific_data; + + friend void* detail::get_task_specific_data(unsigned slot); + friend void detail::set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); + task_base(void* func); // opaque internal / private data used by // thread/thread_private @@ -37,6 +62,8 @@ namespace fc { void (*_run_functor)(void*, void* ); void run_impl(); + + void cleanup_task_specific_data(); }; namespace detail { diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index 10fd05b..b7060d0 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -7,6 +7,15 @@ namespace fc { class time_point; class microseconds; + namespace detail + { + void* get_thread_specific_data(unsigned slot); + void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); + unsigned get_next_unused_task_storage_slot(); + void* get_task_specific_data(unsigned slot); + void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); + } + class thread { public: thread( const std::string& name = "" ); @@ -121,6 +130,11 @@ namespace fc { friend class promise_base; friend class thread_d; friend class mutex; + friend void* detail::get_thread_specific_data(unsigned slot); + friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); + friend unsigned detail::get_next_unused_task_storage_slot(); + friend void* detail::get_task_specific_data(unsigned slot); + friend void detail::set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); #ifndef NDEBUG friend class non_preemptable_scope_check; #endif diff --git a/include/fc/thread/thread_specific.hpp b/include/fc/thread/thread_specific.hpp new file mode 100644 index 0000000..f62297d --- /dev/null +++ b/include/fc/thread/thread_specific.hpp @@ -0,0 +1,84 @@ +#pragma once +#include "thread.hpp" + +namespace fc +{ + namespace detail + { + unsigned get_next_unused_thread_storage_slot(); + unsigned get_next_unused_task_storage_slot(); + } + + template + class thread_specific_ptr + { + private: + unsigned slot; + public: + thread_specific_ptr() : + slot(detail::get_next_unused_thread_storage_slot()) + {} + + T* get() const + { + return static_cast(detail::get_thread_specific_data(slot)); + } + T* operator->() const + { + return get(); + } + T& operator*() const + { + return *get(); + } + operator bool() const + { + return get() != static_cast(0); + } + static void cleanup_function(void* obj) + { + delete static_cast(obj); + } + void reset(T* new_value = 0) + { + detail::set_thread_specific_data(slot, new_value, cleanup_function); + } + }; + + template + class task_specific_ptr + { + private: + unsigned slot; + public: + task_specific_ptr() : + slot(detail::get_next_unused_task_storage_slot()) + {} + + T* get() const + { + return static_cast(detail::get_task_specific_data(slot)); + } + T* operator->() const + { + return get(); + } + T& operator*() const + { + return *get(); + } + operator bool() const + { + return get() != static_cast(0); + } + static void cleanup_function(void* obj) + { + delete static_cast(obj); + } + void reset(T* new_value = 0) + { + detail::set_task_specific_data(slot, new_value, cleanup_function); + } + }; + +} diff --git a/src/thread/task.cpp b/src/thread/task.cpp index 3188145..f2759c9 100644 --- a/src/thread/task.cpp +++ b/src/thread/task.cpp @@ -20,6 +20,7 @@ namespace fc { _posted_num(0), _active_context(nullptr), _next(nullptr), + _task_specific_data(nullptr), _promise_impl(nullptr), _functor(func){ } @@ -63,6 +64,7 @@ namespace fc { } task_base::~task_base() { + cleanup_task_specific_data(); _destroy_functor( _functor ); } @@ -71,4 +73,17 @@ namespace fc { _active_context = c; } } + + void task_base::cleanup_task_specific_data() + { + if (_task_specific_data) + { + for (auto iter = _task_specific_data->begin(); iter != _task_specific_data->end(); ++iter) + if (iter->cleanup) + iter->cleanup(iter->value); + delete _task_specific_data; + _task_specific_data = nullptr; + } + } + } diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index bfd13fa..ae9486b 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -201,6 +201,7 @@ namespace fc { my->check_for_timeouts(); } my->clear_free_list(); + my->cleanup_thread_specific_data(); } void thread::exec() { diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index e156fe4..48a8879 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -26,7 +26,8 @@ namespace fc { pt_head(0), ready_head(0), ready_tail(0), - blocked(0) + blocked(0), + next_unused_task_storage_slot(0) #ifndef NDEBUG ,non_preemptable_scope_count(0) #endif @@ -87,6 +88,15 @@ namespace fc { fc::context* ready_tail; fc::context* blocked; + + // values for thread specific data objects for this thread + std::vector thread_specific_data; + // values for task_specific data for code executing on a thread that's + // not a task launched by async (usually the default task on the main + // thread in a process) + std::vector non_task_specific_data; + unsigned next_unused_task_storage_slot; + #ifndef NDEBUG unsigned non_preemptable_scope_count; #endif @@ -570,5 +580,17 @@ namespace fc { check_fiber_exceptions(); } + + void cleanup_thread_specific_data() + { + for (auto iter = non_task_specific_data.begin(); iter != non_task_specific_data.end(); ++iter) + if (iter->cleanup) + iter->cleanup(iter->value); + + for (auto iter = thread_specific_data.begin(); iter != thread_specific_data.end(); ++iter) + if (iter->cleanup) + iter->cleanup(iter->value); + } + }; } // namespace fc diff --git a/src/thread/thread_specific.cpp b/src/thread/thread_specific.cpp new file mode 100644 index 0000000..a91236d --- /dev/null +++ b/src/thread/thread_specific.cpp @@ -0,0 +1,65 @@ +#include +#include +#include "thread_d.hpp" +#include + +namespace fc +{ + namespace detail + { + boost::atomic thread_specific_slot_counter; + unsigned get_next_unused_thread_storage_slot() + { + return thread_specific_slot_counter.fetch_add(1); + } + + void* get_specific_data(std::vector *specific_data, unsigned slot) + { + return slot < specific_data->size() ? + (*specific_data)[slot].value : nullptr; + } + void set_specific_data(std::vector *specific_data, unsigned slot, void* new_value, void(*cleanup)(void*)) + { + if (slot + 1 > specific_data->size()) + specific_data->resize(slot + 1); + (*specific_data)[slot] = std::move(detail::specific_data_info(new_value, cleanup)); + } + + void* get_thread_specific_data(unsigned slot) + { + return get_specific_data(&thread::current().my->thread_specific_data, slot); + } + void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)) + { + return set_specific_data(&thread::current().my->thread_specific_data, slot, new_value, cleanup); + } + + unsigned get_next_unused_task_storage_slot() + { + return thread::current().my->next_unused_task_storage_slot++; + } + void* get_task_specific_data(unsigned slot) + { + context* current_context = thread::current().my->current; + if (!current_context || + !current_context->cur_task) + return get_specific_data(&thread::current().my->non_task_specific_data, slot); + if (current_context->cur_task->_task_specific_data) + return get_specific_data(current_context->cur_task->_task_specific_data, slot); + return nullptr; + } + void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)) + { + context* current_context = thread::current().my->current; + if (!current_context || + !current_context->cur_task) + set_specific_data(&thread::current().my->non_task_specific_data, slot, new_value, cleanup); + else + { + if (!current_context->cur_task->_task_specific_data) + current_context->cur_task->_task_specific_data = new std::vector; + set_specific_data(current_context->cur_task->_task_specific_data, slot, new_value, cleanup); + } + } + } +} // end namespace fc \ No newline at end of file From ac385d1f6b0e7e073c5337e27fcbd9d990a31eae Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Wed, 27 Aug 2014 14:07:44 -0400 Subject: [PATCH 6/6] Allow the user to supply a reason string when canceling a task (useful for debugging) --- include/fc/thread/future.hpp | 22 +++++++++++++++------- include/fc/thread/task.hpp | 6 +++--- src/log/file_appender.cpp | 2 +- src/network/ntp.cpp | 4 ++-- src/network/udt_socket.cpp | 2 +- src/rpc/json_connection.cpp | 2 +- src/thread/context.hpp | 9 +++++++++ src/thread/future.cpp | 8 +++++++- src/thread/task.cpp | 7 +++++-- tests/task_cancel.cpp | 10 +++++----- 10 files changed, 49 insertions(+), 23 deletions(-) diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index 3e544ee..7233274 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -13,6 +13,13 @@ # define FC_TASK_NAME_DEFAULT_ARG = "?" #endif +#define FC_CANCELATION_REASONS_ARE_MANDATORY 1 +#ifdef FC_CANCELATION_REASONS_ARE_MANDATORY +# define FC_CANCELATION_REASON_DEFAULT_ARG +#else +# define FC_CANCELATION_REASON_DEFAULT_ARG = nullptr +#endif + namespace fc { class abstract_thread; struct void_t{}; @@ -58,7 +65,7 @@ namespace fc { const char* get_desc()const; - virtual void cancel(); + virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG); bool canceled()const { return _canceled; } bool ready()const; bool error()const; @@ -91,6 +98,7 @@ namespace fc { time_point _timeout; fc::exception_ptr _exceptp; bool _canceled; + const char* _cancellation_reason; const char* _desc; detail::completion_handler* _compl; }; @@ -210,14 +218,14 @@ namespace fc { /// @pre valid() bool error()const { return m_prom->error(); } - void cancel()const { if( m_prom ) m_prom->cancel(); } + void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) const { if( m_prom ) m_prom->cancel(reason); } bool canceled()const { if( m_prom ) return m_prom->canceled(); else return true;} - void cancel_and_wait() + void cancel_and_wait(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) { if( valid() ) { - cancel(); + cancel(reason); try { wait(); @@ -276,9 +284,9 @@ namespace fc { bool valid()const { return !!m_prom; } bool canceled()const { return m_prom ? m_prom->canceled() : true; } - void cancel_and_wait() + void cancel_and_wait(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) { - cancel(); + cancel(reason); try { wait(); @@ -294,7 +302,7 @@ namespace fc { /// @pre valid() bool error()const { return m_prom->error(); } - void cancel()const { if( m_prom ) m_prom->cancel(); } + void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) const { if( m_prom ) m_prom->cancel(reason); } template void on_complete( CompletionHandler&& c ) { diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index 7517cd1..dd52b77 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -30,7 +30,7 @@ namespace fc { class task_base : virtual public promise_base { public: void run(); - virtual void cancel() override; + virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override; protected: ~task_base(); @@ -99,7 +99,7 @@ namespace fc { _promise_impl = static_cast*>(this); _run_functor = &detail::functor_run::run; } - virtual void cancel() override { task_base::cancel(); } + virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); } aligned _functor; private: @@ -119,7 +119,7 @@ namespace fc { _promise_impl = static_cast*>(this); _run_functor = &detail::void_functor_run::run; } - virtual void cancel() override { task_base::cancel(); } + virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); } aligned _functor; private: diff --git a/src/log/file_appender.cpp b/src/log/file_appender.cpp index 92c2848..e44d060 100644 --- a/src/log/file_appender.cpp +++ b/src/log/file_appender.cpp @@ -82,7 +82,7 @@ namespace fc { { try { - _rotation_task.cancel_and_wait(); + _rotation_task.cancel_and_wait("file_appender is destructing"); } catch( ... ) { diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index 9febe47..31d8b87 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -139,7 +139,7 @@ namespace fc my->_ntp_thread.async([=](){ try { - my->_request_time_task_done.cancel_and_wait(); + my->_request_time_task_done.cancel_and_wait("ntp object is destructing"); } catch ( const fc::exception& e ) { @@ -152,7 +152,7 @@ namespace fc try { - my->_read_loop_done.cancel(); + my->_read_loop_done.cancel("ntp object is destructing"); my->_sock.close(); my->_read_loop_done.wait(); } diff --git a/src/network/udt_socket.cpp b/src/network/udt_socket.cpp index 0018238..ee4dbb5 100644 --- a/src/network/udt_socket.cpp +++ b/src/network/udt_socket.cpp @@ -36,7 +36,7 @@ namespace fc { ~udt_epoll_service() { - _epoll_loop.cancel(); + _epoll_loop.cancel("udt_epoll_service is destructing"); _epoll_loop.wait(); UDT::cleanup(); } diff --git a/src/rpc/json_connection.cpp b/src/rpc/json_connection.cpp index 06d3fa4..2d1014f 100644 --- a/src/rpc/json_connection.cpp +++ b/src/rpc/json_connection.cpp @@ -243,7 +243,7 @@ namespace fc { namespace rpc { { if( my->_done.valid() && !my->_done.ready() ) { - my->_done.cancel(); + my->_done.cancel("json_connection is destructing"); my->_out->close(); my->_done.wait(); } diff --git a/src/thread/context.hpp b/src/thread/context.hpp index 4ab0dd4..ecd360f 100644 --- a/src/thread/context.hpp +++ b/src/thread/context.hpp @@ -48,6 +48,9 @@ namespace fc { next(0), ctx_thread(t), canceled(false), +#ifndef NDEBUG + cancellation_reason(nullptr), +#endif complete(false), cur_task(0) { @@ -78,6 +81,9 @@ namespace fc { next(0), ctx_thread(t), canceled(false), +#ifndef NDEBUG + cancellation_reason(nullptr), +#endif complete(false), cur_task(0) {} @@ -192,6 +198,9 @@ namespace fc { fc::context* next; fc::thread* ctx_thread; bool canceled; +#ifndef NDEBUG + const char* cancellation_reason; +#endif bool complete; task_base* cur_task; }; diff --git a/src/thread/future.cpp b/src/thread/future.cpp index 9ccc47f..9ad6648 100644 --- a/src/thread/future.cpp +++ b/src/thread/future.cpp @@ -17,6 +17,9 @@ namespace fc { #endif _timeout(time_point::maximum()), _canceled(false), +#ifndef NDEBUG + _cancellation_reason(nullptr), +#endif _desc(desc), _compl(nullptr) { } @@ -25,9 +28,12 @@ namespace fc { return _desc; } - void promise_base::cancel(){ + void promise_base::cancel(const char* reason /* = nullptr */){ // wlog("${desc} canceled!", ("desc", _desc? _desc : "")); _canceled = true; +#ifndef NDEBUG + _cancellation_reason = reason; +#endif } bool promise_base::ready()const { return _ready; diff --git a/src/thread/task.cpp b/src/thread/task.cpp index f2759c9..46af23f 100644 --- a/src/thread/task.cpp +++ b/src/thread/task.cpp @@ -54,12 +54,15 @@ namespace fc { } } - void task_base::cancel() + void task_base::cancel(const char* reason /* = nullptr */) { - promise_base::cancel(); + promise_base::cancel(reason); if (_active_context) { _active_context->canceled = true; +#ifndef NDEBUG + _active_context->cancellation_reason = reason; +#endif } } diff --git a/tests/task_cancel.cpp b/tests/task_cancel.cpp index 0b9408e..ff1d51d 100644 --- a/tests/task_cancel.cpp +++ b/tests/task_cancel.cpp @@ -18,7 +18,7 @@ BOOST_AUTO_TEST_CASE( leave_mutex_locked ) fc::usleep(fc::seconds(1)); }, "test_task"); fc::usleep(fc::seconds(3)); - test_task.cancel_and_wait(); + test_task.cancel_and_wait("cancel called directly by test"); } BOOST_TEST_PASSPOINT(); } @@ -57,7 +57,7 @@ BOOST_AUTO_TEST_CASE( cancel_task_blocked_on_mutex) } fc::usleep(fc::seconds(3)); - test_task.cancel_and_wait(); + test_task.cancel_and_wait("cleaning up test"); } } @@ -115,7 +115,7 @@ BOOST_AUTO_TEST_CASE( cancel_an_active_task ) fc::usleep(fc::milliseconds(100)); BOOST_TEST_MESSAGE("Canceling task"); - task.cancel(); + task.cancel("canceling to test if cancel works"); try { task_result result = task.wait(); @@ -149,7 +149,7 @@ BOOST_AUTO_TEST_CASE( cleanup_cancelled_task ) BOOST_CHECK_MESSAGE(!weak_string_ptr.expired(), "Weak pointer should still be valid because async task should be holding the strong pointer"); fc::usleep(fc::milliseconds(100)); BOOST_TEST_MESSAGE("Canceling task"); - task.cancel(); + task.cancel("canceling to test if cancel works"); try { task.wait(); @@ -181,7 +181,7 @@ BOOST_AUTO_TEST_CASE( cancel_scheduled_task ) simple_task(); simple_task(); fc::usleep(fc::seconds(4)); - simple_task_done.cancel(); + simple_task_done.cancel("canceling scheduled task to test if cancel works"); simple_task_done.wait(); } catch ( const fc::exception& e )