Merge branch 'log_task_names'
This commit is contained in:
commit
477d3397f6
19 changed files with 420 additions and 67 deletions
|
|
@ -96,6 +96,7 @@ set( fc_sources
|
|||
src/exception.cpp
|
||||
src/variant_object.cpp
|
||||
src/thread/thread.cpp
|
||||
src/thread/thread_specific.cpp
|
||||
src/thread/future.cpp
|
||||
src/thread/task.cpp
|
||||
src/thread/spin_lock.cpp
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ namespace fc
|
|||
uint64_t get_line_number()const;
|
||||
string get_method()const;
|
||||
string get_thread_name()const;
|
||||
string get_task_name()const;
|
||||
string get_host_name()const;
|
||||
time_point get_timestamp()const;
|
||||
log_level get_log_level()const;
|
||||
|
|
|
|||
|
|
@ -13,6 +13,13 @@
|
|||
# define FC_TASK_NAME_DEFAULT_ARG = "?"
|
||||
#endif
|
||||
|
||||
#define FC_CANCELATION_REASONS_ARE_MANDATORY 1
|
||||
#ifdef FC_CANCELATION_REASONS_ARE_MANDATORY
|
||||
# define FC_CANCELATION_REASON_DEFAULT_ARG
|
||||
#else
|
||||
# define FC_CANCELATION_REASON_DEFAULT_ARG = nullptr
|
||||
#endif
|
||||
|
||||
namespace fc {
|
||||
class abstract_thread;
|
||||
struct void_t{};
|
||||
|
|
@ -58,7 +65,7 @@ namespace fc {
|
|||
|
||||
const char* get_desc()const;
|
||||
|
||||
virtual void cancel();
|
||||
virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG);
|
||||
bool canceled()const { return _canceled; }
|
||||
bool ready()const;
|
||||
bool error()const;
|
||||
|
|
@ -91,6 +98,7 @@ namespace fc {
|
|||
time_point _timeout;
|
||||
fc::exception_ptr _exceptp;
|
||||
bool _canceled;
|
||||
const char* _cancellation_reason;
|
||||
const char* _desc;
|
||||
detail::completion_handler* _compl;
|
||||
};
|
||||
|
|
@ -210,14 +218,14 @@ namespace fc {
|
|||
/// @pre valid()
|
||||
bool error()const { return m_prom->error(); }
|
||||
|
||||
void cancel()const { if( m_prom ) m_prom->cancel(); }
|
||||
void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) const { if( m_prom ) m_prom->cancel(reason); }
|
||||
bool canceled()const { if( m_prom ) return m_prom->canceled(); else return true;}
|
||||
|
||||
void cancel_and_wait()
|
||||
void cancel_and_wait(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG)
|
||||
{
|
||||
if( valid() )
|
||||
{
|
||||
cancel();
|
||||
cancel(reason);
|
||||
try
|
||||
{
|
||||
wait();
|
||||
|
|
@ -276,9 +284,9 @@ namespace fc {
|
|||
bool valid()const { return !!m_prom; }
|
||||
bool canceled()const { return m_prom ? m_prom->canceled() : true; }
|
||||
|
||||
void cancel_and_wait()
|
||||
void cancel_and_wait(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG)
|
||||
{
|
||||
cancel();
|
||||
cancel(reason);
|
||||
try
|
||||
{
|
||||
wait();
|
||||
|
|
@ -294,7 +302,7 @@ namespace fc {
|
|||
/// @pre valid()
|
||||
bool error()const { return m_prom->error(); }
|
||||
|
||||
void cancel()const { if( m_prom ) m_prom->cancel(); }
|
||||
void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) const { if( m_prom ) m_prom->cancel(reason); }
|
||||
|
||||
template<typename CompletionHandler>
|
||||
void on_complete( CompletionHandler&& c ) {
|
||||
|
|
|
|||
|
|
@ -8,10 +8,29 @@ namespace fc {
|
|||
struct context;
|
||||
class spin_lock;
|
||||
|
||||
namespace detail
|
||||
{
|
||||
struct specific_data_info
|
||||
{
|
||||
void* value;
|
||||
void (*cleanup)(void*);
|
||||
specific_data_info() :
|
||||
value(0),
|
||||
cleanup(0)
|
||||
{}
|
||||
specific_data_info(void* value, void (*cleanup)(void*)) :
|
||||
value(value),
|
||||
cleanup(cleanup)
|
||||
{}
|
||||
};
|
||||
void* get_task_specific_data(unsigned slot);
|
||||
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
|
||||
}
|
||||
|
||||
class task_base : virtual public promise_base {
|
||||
public:
|
||||
void run();
|
||||
virtual void cancel() override;
|
||||
virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override;
|
||||
|
||||
protected:
|
||||
~task_base();
|
||||
|
|
@ -23,6 +42,12 @@ namespace fc {
|
|||
context* _active_context;
|
||||
task_base* _next;
|
||||
|
||||
// support for task-specific data
|
||||
std::vector<detail::specific_data_info> *_task_specific_data;
|
||||
|
||||
friend void* detail::get_task_specific_data(unsigned slot);
|
||||
friend void detail::set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
|
||||
|
||||
task_base(void* func);
|
||||
// opaque internal / private data used by
|
||||
// thread/thread_private
|
||||
|
|
@ -37,6 +62,8 @@ namespace fc {
|
|||
void (*_run_functor)(void*, void* );
|
||||
|
||||
void run_impl();
|
||||
|
||||
void cleanup_task_specific_data();
|
||||
};
|
||||
|
||||
namespace detail {
|
||||
|
|
@ -72,7 +99,7 @@ namespace fc {
|
|||
_promise_impl = static_cast<promise<R>*>(this);
|
||||
_run_functor = &detail::functor_run<FunctorType>::run;
|
||||
}
|
||||
virtual void cancel() override { task_base::cancel(); }
|
||||
virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); }
|
||||
|
||||
aligned<FunctorSize> _functor;
|
||||
private:
|
||||
|
|
@ -92,7 +119,7 @@ namespace fc {
|
|||
_promise_impl = static_cast<promise<void>*>(this);
|
||||
_run_functor = &detail::void_functor_run<FunctorType>::run;
|
||||
}
|
||||
virtual void cancel() override { task_base::cancel(); }
|
||||
virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); }
|
||||
|
||||
aligned<FunctorSize> _functor;
|
||||
private:
|
||||
|
|
|
|||
|
|
@ -7,6 +7,15 @@ namespace fc {
|
|||
class time_point;
|
||||
class microseconds;
|
||||
|
||||
namespace detail
|
||||
{
|
||||
void* get_thread_specific_data(unsigned slot);
|
||||
void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
|
||||
unsigned get_next_unused_task_storage_slot();
|
||||
void* get_task_specific_data(unsigned slot);
|
||||
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
|
||||
}
|
||||
|
||||
class thread {
|
||||
public:
|
||||
thread( const std::string& name = "" );
|
||||
|
|
@ -32,6 +41,8 @@ namespace fc {
|
|||
*/
|
||||
void set_name( const string& n );
|
||||
|
||||
const char* current_task_desc() const;
|
||||
|
||||
/**
|
||||
* @brief print debug info about the state of every context / promise.
|
||||
*
|
||||
|
|
@ -119,6 +130,11 @@ namespace fc {
|
|||
friend class promise_base;
|
||||
friend class thread_d;
|
||||
friend class mutex;
|
||||
friend void* detail::get_thread_specific_data(unsigned slot);
|
||||
friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
|
||||
friend unsigned detail::get_next_unused_task_storage_slot();
|
||||
friend void* detail::get_task_specific_data(unsigned slot);
|
||||
friend void detail::set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
|
||||
#ifndef NDEBUG
|
||||
friend class non_preemptable_scope_check;
|
||||
#endif
|
||||
|
|
|
|||
84
include/fc/thread/thread_specific.hpp
Normal file
84
include/fc/thread/thread_specific.hpp
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
#pragma once
|
||||
#include "thread.hpp"
|
||||
|
||||
namespace fc
|
||||
{
|
||||
namespace detail
|
||||
{
|
||||
unsigned get_next_unused_thread_storage_slot();
|
||||
unsigned get_next_unused_task_storage_slot();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
class thread_specific_ptr
|
||||
{
|
||||
private:
|
||||
unsigned slot;
|
||||
public:
|
||||
thread_specific_ptr() :
|
||||
slot(detail::get_next_unused_thread_storage_slot())
|
||||
{}
|
||||
|
||||
T* get() const
|
||||
{
|
||||
return static_cast<T*>(detail::get_thread_specific_data(slot));
|
||||
}
|
||||
T* operator->() const
|
||||
{
|
||||
return get();
|
||||
}
|
||||
T& operator*() const
|
||||
{
|
||||
return *get();
|
||||
}
|
||||
operator bool() const
|
||||
{
|
||||
return get() != static_cast<T*>(0);
|
||||
}
|
||||
static void cleanup_function(void* obj)
|
||||
{
|
||||
delete static_cast<T*>(obj);
|
||||
}
|
||||
void reset(T* new_value = 0)
|
||||
{
|
||||
detail::set_thread_specific_data(slot, new_value, cleanup_function);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class task_specific_ptr
|
||||
{
|
||||
private:
|
||||
unsigned slot;
|
||||
public:
|
||||
task_specific_ptr() :
|
||||
slot(detail::get_next_unused_task_storage_slot())
|
||||
{}
|
||||
|
||||
T* get() const
|
||||
{
|
||||
return static_cast<T*>(detail::get_task_specific_data(slot));
|
||||
}
|
||||
T* operator->() const
|
||||
{
|
||||
return get();
|
||||
}
|
||||
T& operator*() const
|
||||
{
|
||||
return *get();
|
||||
}
|
||||
operator bool() const
|
||||
{
|
||||
return get() != static_cast<T*>(0);
|
||||
}
|
||||
static void cleanup_function(void* obj)
|
||||
{
|
||||
delete static_cast<T*>(obj);
|
||||
}
|
||||
void reset(T* new_value = 0)
|
||||
{
|
||||
detail::set_task_specific_data(slot, new_value, cleanup_function);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
|
@ -82,7 +82,7 @@ namespace fc {
|
|||
{
|
||||
try
|
||||
{
|
||||
_rotation_task.cancel_and_wait();
|
||||
_rotation_task.cancel_and_wait("file_appender is destructing");
|
||||
}
|
||||
catch( ... )
|
||||
{
|
||||
|
|
@ -192,7 +192,7 @@ namespace fc {
|
|||
std::stringstream line;
|
||||
//line << (m.get_context().get_timestamp().time_since_epoch().count() % (1000ll*1000ll*60ll*60))/1000 <<"ms ";
|
||||
line << std::string(m.get_context().get_timestamp()) << " ";
|
||||
line << std::setw( 10 ) << m.get_context().get_thread_name().substr(0,9).c_str() <<" ";
|
||||
line << std::setw( 21 ) << (m.get_context().get_thread_name().substr(0,9) + string(":") + m.get_context().get_task_name()).c_str() <<" ";
|
||||
|
||||
auto me = m.get_context().get_method();
|
||||
// strip all leading scopes...
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ namespace fc
|
|||
uint64_t line;
|
||||
string method;
|
||||
string thread_name;
|
||||
string task_name;
|
||||
string hostname;
|
||||
string context;
|
||||
time_point timestamp;
|
||||
|
|
@ -53,6 +54,8 @@ namespace fc
|
|||
my->method = method;
|
||||
my->timestamp = time_point::now();
|
||||
my->thread_name = fc::thread::current().name();
|
||||
const char* current_task_desc = fc::thread::current().current_task_desc();
|
||||
my->task_name = current_task_desc ? current_task_desc : "?unnamed?";
|
||||
}
|
||||
|
||||
log_context::log_context( const variant& v )
|
||||
|
|
@ -65,6 +68,8 @@ namespace fc
|
|||
my->method = obj["method"].as_string();
|
||||
my->hostname = obj["hostname"].as_string();
|
||||
my->thread_name = obj["thread_name"].as_string();
|
||||
if (obj.contains("task_name"))
|
||||
my->task_name = obj["task_name"].as_string();
|
||||
my->timestamp = obj["timestamp"].as<time_point>();
|
||||
if( obj.contains( "context" ) )
|
||||
my->context = obj["context"].as<string>();
|
||||
|
|
@ -149,6 +154,7 @@ namespace fc
|
|||
uint64_t log_context::get_line_number()const { return my->line; }
|
||||
string log_context::get_method()const { return my->method; }
|
||||
string log_context::get_thread_name()const { return my->thread_name; }
|
||||
string log_context::get_task_name()const { return my->task_name; }
|
||||
string log_context::get_host_name()const { return my->hostname; }
|
||||
time_point log_context::get_timestamp()const { return my->timestamp; }
|
||||
log_level log_context::get_log_level()const{ return my->level; }
|
||||
|
|
|
|||
|
|
@ -60,6 +60,10 @@ namespace fc
|
|||
break;
|
||||
}
|
||||
}
|
||||
catch (const fc::canceled_exception&)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
// this could fail to resolve but we want to go on to other hosts..
|
||||
catch ( const fc::exception& e )
|
||||
{
|
||||
|
|
@ -135,7 +139,7 @@ namespace fc
|
|||
my->_ntp_thread.async([=](){
|
||||
try
|
||||
{
|
||||
my->_request_time_task_done.cancel_and_wait();
|
||||
my->_request_time_task_done.cancel_and_wait("ntp object is destructing");
|
||||
}
|
||||
catch ( const fc::exception& e )
|
||||
{
|
||||
|
|
@ -148,7 +152,7 @@ namespace fc
|
|||
|
||||
try
|
||||
{
|
||||
my->_read_loop_done.cancel();
|
||||
my->_read_loop_done.cancel("ntp object is destructing");
|
||||
my->_sock.close();
|
||||
my->_read_loop_done.wait();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ namespace fc {
|
|||
|
||||
~udt_epoll_service()
|
||||
{
|
||||
_epoll_loop.cancel();
|
||||
_epoll_loop.cancel("udt_epoll_service is destructing");
|
||||
_epoll_loop.wait();
|
||||
UDT::cleanup();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -243,7 +243,7 @@ namespace fc { namespace rpc {
|
|||
{
|
||||
if( my->_done.valid() && !my->_done.ready() )
|
||||
{
|
||||
my->_done.cancel();
|
||||
my->_done.cancel("json_connection is destructing");
|
||||
my->_out->close();
|
||||
my->_done.wait();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,6 +48,9 @@ namespace fc {
|
|||
next(0),
|
||||
ctx_thread(t),
|
||||
canceled(false),
|
||||
#ifndef NDEBUG
|
||||
cancellation_reason(nullptr),
|
||||
#endif
|
||||
complete(false),
|
||||
cur_task(0)
|
||||
{
|
||||
|
|
@ -78,6 +81,9 @@ namespace fc {
|
|||
next(0),
|
||||
ctx_thread(t),
|
||||
canceled(false),
|
||||
#ifndef NDEBUG
|
||||
cancellation_reason(nullptr),
|
||||
#endif
|
||||
complete(false),
|
||||
cur_task(0)
|
||||
{}
|
||||
|
|
@ -192,6 +198,9 @@ namespace fc {
|
|||
fc::context* next;
|
||||
fc::thread* ctx_thread;
|
||||
bool canceled;
|
||||
#ifndef NDEBUG
|
||||
const char* cancellation_reason;
|
||||
#endif
|
||||
bool complete;
|
||||
task_base* cur_task;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -17,6 +17,9 @@ namespace fc {
|
|||
#endif
|
||||
_timeout(time_point::maximum()),
|
||||
_canceled(false),
|
||||
#ifndef NDEBUG
|
||||
_cancellation_reason(nullptr),
|
||||
#endif
|
||||
_desc(desc),
|
||||
_compl(nullptr)
|
||||
{ }
|
||||
|
|
@ -25,9 +28,12 @@ namespace fc {
|
|||
return _desc;
|
||||
}
|
||||
|
||||
void promise_base::cancel(){
|
||||
void promise_base::cancel(const char* reason /* = nullptr */){
|
||||
// wlog("${desc} canceled!", ("desc", _desc? _desc : ""));
|
||||
_canceled = true;
|
||||
#ifndef NDEBUG
|
||||
_cancellation_reason = reason;
|
||||
#endif
|
||||
}
|
||||
bool promise_base::ready()const {
|
||||
return _ready;
|
||||
|
|
@ -44,13 +50,16 @@ namespace fc {
|
|||
}
|
||||
|
||||
void promise_base::_wait( const microseconds& timeout_us ){
|
||||
if( timeout_us == microseconds::maximum() ) _wait_until( time_point::maximum() );
|
||||
else _wait_until( time_point::now() + timeout_us );
|
||||
if( timeout_us == microseconds::maximum() )
|
||||
_wait_until( time_point::maximum() );
|
||||
else
|
||||
_wait_until( time_point::now() + timeout_us );
|
||||
}
|
||||
void promise_base::_wait_until( const time_point& timeout_us ){
|
||||
{ synchronized(_spin_yield)
|
||||
if( _ready ) {
|
||||
if( _exceptp ) _exceptp->dynamic_rethrow_exception();
|
||||
if( _exceptp )
|
||||
_exceptp->dynamic_rethrow_exception();
|
||||
return;
|
||||
}
|
||||
_enqueue_thread();
|
||||
|
|
|
|||
|
|
@ -11,44 +11,53 @@ namespace fc {
|
|||
:m_blist(0){}
|
||||
|
||||
mutex::~mutex() {
|
||||
if( m_blist ) {
|
||||
auto c = m_blist;
|
||||
if( m_blist )
|
||||
{
|
||||
context* c = m_blist;
|
||||
fc::thread::current().debug("~mutex");
|
||||
#if 0
|
||||
while( c ) {
|
||||
// elog( "still blocking on context %p (%s)", m_blist, (m_blist->cur_task ? m_blist->cur_task->get_desc() : "no current task") );
|
||||
c = c->next_blocked_mutex;
|
||||
}
|
||||
#endif
|
||||
BOOST_ASSERT( false && "Attempt to free mutex while others are blocking on lock." );
|
||||
}
|
||||
BOOST_ASSERT( !m_blist && "Attempt to free mutex while others are blocking on lock." );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param next - is set to the next context to get the lock.
|
||||
* @param last_context - is set to the next context to get the lock (the next-to-last element of the list)
|
||||
* @return the last context (the one with the lock)
|
||||
*/
|
||||
static fc::context* get_tail( fc::context* h, fc::context*& next ) {
|
||||
next = 0;
|
||||
fc::context* n = h;
|
||||
if( !n ) return n;
|
||||
while( n->next_blocked_mutex ) {
|
||||
next = n;
|
||||
n=n->next_blocked_mutex;
|
||||
static fc::context* get_tail( fc::context* list_head, fc::context*& context_to_unblock ) {
|
||||
context_to_unblock = 0;
|
||||
fc::context* list_context_iter = list_head;
|
||||
if( !list_context_iter )
|
||||
return list_context_iter;
|
||||
while( list_context_iter->next_blocked_mutex )
|
||||
{
|
||||
context_to_unblock = list_context_iter;
|
||||
list_context_iter = list_context_iter->next_blocked_mutex;
|
||||
}
|
||||
return n;
|
||||
return list_context_iter;
|
||||
}
|
||||
|
||||
static fc::context* remove( fc::context* head, fc::context* target ) {
|
||||
fc::context* c = head;
|
||||
fc::context* p = 0;
|
||||
while( c ) {
|
||||
if( c == target ) {
|
||||
if( p ) {
|
||||
p->next_blocked_mutex = c->next_blocked_mutex;
|
||||
fc::context* context_iter = head;
|
||||
fc::context* previous = 0;
|
||||
while( context_iter )
|
||||
{
|
||||
if( context_iter == target )
|
||||
{
|
||||
if( previous )
|
||||
{
|
||||
previous->next_blocked_mutex = context_iter->next_blocked_mutex;
|
||||
return head;
|
||||
}
|
||||
return c->next_blocked_mutex;
|
||||
return context_iter->next_blocked_mutex;
|
||||
}
|
||||
p = c;
|
||||
c = c->next_blocked_mutex;
|
||||
previous = context_iter;
|
||||
context_iter = context_iter->next_blocked_mutex;
|
||||
}
|
||||
return head;
|
||||
}
|
||||
|
|
@ -56,7 +65,8 @@ namespace fc {
|
|||
{
|
||||
fc::unique_lock<fc::spin_yield_lock> lock(syl);
|
||||
if( cc->next_blocked_mutex ) {
|
||||
bl = remove(bl, cc );
|
||||
bl = remove(bl, cc);
|
||||
cc->next_blocked_mutex = nullptr;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
@ -114,17 +124,25 @@ namespace fc {
|
|||
}
|
||||
|
||||
void mutex::lock() {
|
||||
fc::context* n = 0;
|
||||
fc::context* cc = fc::thread::current().my->current;
|
||||
fc::context* current_context = fc::thread::current().my->current;
|
||||
if( !current_context )
|
||||
current_context = fc::thread::current().my->current = new fc::context( &fc::thread::current() );
|
||||
|
||||
{
|
||||
fc::unique_lock<fc::spin_yield_lock> lock(m_blist_lock);
|
||||
if( !m_blist ) {
|
||||
m_blist = cc;
|
||||
if( !m_blist )
|
||||
{
|
||||
// nobody else owns the mutex, so we get it; add our context as the last and only element on the mutex's list
|
||||
m_blist = current_context;
|
||||
assert(!current_context->next_blocked_mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
// allow recusive locks
|
||||
if ( get_tail( m_blist, n ) == cc ) {
|
||||
fc::context* dummy_context_to_unblock = 0;
|
||||
if ( get_tail( m_blist, dummy_context_to_unblock ) == current_context ) {
|
||||
// if we already have the lock (meaning we're on the tail of the list) then
|
||||
// we shouldn't be trying to grab the lock again
|
||||
assert(false);
|
||||
// EMF: I think recursive locks are currently broken -- we need to
|
||||
// keep track of how many times this mutex has been locked by the
|
||||
|
|
@ -132,9 +150,11 @@ namespace fc {
|
|||
// the next context only if the count drops to zero
|
||||
return;
|
||||
}
|
||||
cc->next_blocked_mutex = m_blist;
|
||||
m_blist = cc;
|
||||
// add ourselves to the head of the list
|
||||
current_context->next_blocked_mutex = m_blist;
|
||||
m_blist = current_context;
|
||||
|
||||
#if 0
|
||||
int cnt = 0;
|
||||
auto i = m_blist;
|
||||
while( i ) {
|
||||
|
|
@ -142,25 +162,36 @@ namespace fc {
|
|||
++cnt;
|
||||
}
|
||||
//wlog( "wait queue len %1%", cnt );
|
||||
#endif
|
||||
}
|
||||
|
||||
try {
|
||||
try
|
||||
{
|
||||
fc::thread::current().yield(false);
|
||||
BOOST_ASSERT( cc->next_blocked_mutex == 0 );
|
||||
} catch ( ... ) {
|
||||
wlog( "lock threw" );
|
||||
cleanup( *this, m_blist_lock, m_blist, cc);
|
||||
// if yield() returned normally, we should now own the lock (we should be at the tail of the list)
|
||||
BOOST_ASSERT( current_context->next_blocked_mutex == 0 );
|
||||
}
|
||||
catch ( exception& e )
|
||||
{
|
||||
wlog( "lock threw: ${e}", ("e", e));
|
||||
cleanup( *this, m_blist_lock, m_blist, current_context);
|
||||
FC_RETHROW_EXCEPTION(e, warn, "lock threw: ${e}", ("e", e));
|
||||
}
|
||||
catch ( ... )
|
||||
{
|
||||
wlog( "lock threw unexpected exception" );
|
||||
cleanup( *this, m_blist_lock, m_blist, current_context);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void mutex::unlock() {
|
||||
fc::context* next = 0;
|
||||
fc::context* context_to_unblock = 0;
|
||||
{ fc::unique_lock<fc::spin_yield_lock> lock(m_blist_lock);
|
||||
get_tail(m_blist, next);
|
||||
if( next ) {
|
||||
next->next_blocked_mutex = 0;
|
||||
next->ctx_thread->my->unblock( next );
|
||||
get_tail(m_blist, context_to_unblock);
|
||||
if( context_to_unblock ) {
|
||||
context_to_unblock->next_blocked_mutex = 0;
|
||||
context_to_unblock->ctx_thread->my->unblock( context_to_unblock );
|
||||
} else {
|
||||
m_blist = 0;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ namespace fc {
|
|||
_posted_num(0),
|
||||
_active_context(nullptr),
|
||||
_next(nullptr),
|
||||
_task_specific_data(nullptr),
|
||||
_promise_impl(nullptr),
|
||||
_functor(func){
|
||||
}
|
||||
|
|
@ -53,16 +54,20 @@ namespace fc {
|
|||
}
|
||||
}
|
||||
|
||||
void task_base::cancel()
|
||||
void task_base::cancel(const char* reason /* = nullptr */)
|
||||
{
|
||||
promise_base::cancel();
|
||||
promise_base::cancel(reason);
|
||||
if (_active_context)
|
||||
{
|
||||
_active_context->canceled = true;
|
||||
#ifndef NDEBUG
|
||||
_active_context->cancellation_reason = reason;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
task_base::~task_base() {
|
||||
cleanup_task_specific_data();
|
||||
_destroy_functor( _functor );
|
||||
}
|
||||
|
||||
|
|
@ -71,4 +76,17 @@ namespace fc {
|
|||
_active_context = c;
|
||||
}
|
||||
}
|
||||
|
||||
void task_base::cleanup_task_specific_data()
|
||||
{
|
||||
if (_task_specific_data)
|
||||
{
|
||||
for (auto iter = _task_specific_data->begin(); iter != _task_specific_data->end(); ++iter)
|
||||
if (iter->cleanup)
|
||||
iter->cleanup(iter->value);
|
||||
delete _task_specific_data;
|
||||
_task_specific_data = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -123,6 +123,14 @@ namespace fc {
|
|||
}
|
||||
const string& thread::name()const { return my->name; }
|
||||
void thread::set_name( const fc::string& n ) { my->name = n; }
|
||||
|
||||
const char* thread::current_task_desc() const
|
||||
{
|
||||
if (my->current && my->current->cur_task)
|
||||
return my->current->cur_task->get_desc();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void thread::debug( const fc::string& d ) { /*my->debug(d);*/ }
|
||||
|
||||
void thread::quit() {
|
||||
|
|
@ -193,6 +201,7 @@ namespace fc {
|
|||
my->check_for_timeouts();
|
||||
}
|
||||
my->clear_free_list();
|
||||
my->cleanup_thread_specific_data();
|
||||
}
|
||||
|
||||
void thread::exec() {
|
||||
|
|
|
|||
|
|
@ -26,7 +26,8 @@ namespace fc {
|
|||
pt_head(0),
|
||||
ready_head(0),
|
||||
ready_tail(0),
|
||||
blocked(0)
|
||||
blocked(0),
|
||||
next_unused_task_storage_slot(0)
|
||||
#ifndef NDEBUG
|
||||
,non_preemptable_scope_count(0)
|
||||
#endif
|
||||
|
|
@ -87,6 +88,15 @@ namespace fc {
|
|||
fc::context* ready_tail;
|
||||
|
||||
fc::context* blocked;
|
||||
|
||||
// values for thread specific data objects for this thread
|
||||
std::vector<detail::specific_data_info> thread_specific_data;
|
||||
// values for task_specific data for code executing on a thread that's
|
||||
// not a task launched by async (usually the default task on the main
|
||||
// thread in a process)
|
||||
std::vector<detail::specific_data_info> non_task_specific_data;
|
||||
unsigned next_unused_task_storage_slot;
|
||||
|
||||
#ifndef NDEBUG
|
||||
unsigned non_preemptable_scope_count;
|
||||
#endif
|
||||
|
|
@ -570,5 +580,17 @@ namespace fc {
|
|||
|
||||
check_fiber_exceptions();
|
||||
}
|
||||
|
||||
void cleanup_thread_specific_data()
|
||||
{
|
||||
for (auto iter = non_task_specific_data.begin(); iter != non_task_specific_data.end(); ++iter)
|
||||
if (iter->cleanup)
|
||||
iter->cleanup(iter->value);
|
||||
|
||||
for (auto iter = thread_specific_data.begin(); iter != thread_specific_data.end(); ++iter)
|
||||
if (iter->cleanup)
|
||||
iter->cleanup(iter->value);
|
||||
}
|
||||
|
||||
};
|
||||
} // namespace fc
|
||||
|
|
|
|||
65
src/thread/thread_specific.cpp
Normal file
65
src/thread/thread_specific.cpp
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
#include <fc/log/logger.hpp>
|
||||
#include <fc/thread/thread_specific.hpp>
|
||||
#include "thread_d.hpp"
|
||||
#include <boost/atomic.hpp>
|
||||
|
||||
namespace fc
|
||||
{
|
||||
namespace detail
|
||||
{
|
||||
boost::atomic<unsigned> thread_specific_slot_counter;
|
||||
unsigned get_next_unused_thread_storage_slot()
|
||||
{
|
||||
return thread_specific_slot_counter.fetch_add(1);
|
||||
}
|
||||
|
||||
void* get_specific_data(std::vector<detail::specific_data_info> *specific_data, unsigned slot)
|
||||
{
|
||||
return slot < specific_data->size() ?
|
||||
(*specific_data)[slot].value : nullptr;
|
||||
}
|
||||
void set_specific_data(std::vector<detail::specific_data_info> *specific_data, unsigned slot, void* new_value, void(*cleanup)(void*))
|
||||
{
|
||||
if (slot + 1 > specific_data->size())
|
||||
specific_data->resize(slot + 1);
|
||||
(*specific_data)[slot] = std::move(detail::specific_data_info(new_value, cleanup));
|
||||
}
|
||||
|
||||
void* get_thread_specific_data(unsigned slot)
|
||||
{
|
||||
return get_specific_data(&thread::current().my->thread_specific_data, slot);
|
||||
}
|
||||
void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*))
|
||||
{
|
||||
return set_specific_data(&thread::current().my->thread_specific_data, slot, new_value, cleanup);
|
||||
}
|
||||
|
||||
unsigned get_next_unused_task_storage_slot()
|
||||
{
|
||||
return thread::current().my->next_unused_task_storage_slot++;
|
||||
}
|
||||
void* get_task_specific_data(unsigned slot)
|
||||
{
|
||||
context* current_context = thread::current().my->current;
|
||||
if (!current_context ||
|
||||
!current_context->cur_task)
|
||||
return get_specific_data(&thread::current().my->non_task_specific_data, slot);
|
||||
if (current_context->cur_task->_task_specific_data)
|
||||
return get_specific_data(current_context->cur_task->_task_specific_data, slot);
|
||||
return nullptr;
|
||||
}
|
||||
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*))
|
||||
{
|
||||
context* current_context = thread::current().my->current;
|
||||
if (!current_context ||
|
||||
!current_context->cur_task)
|
||||
set_specific_data(&thread::current().my->non_task_specific_data, slot, new_value, cleanup);
|
||||
else
|
||||
{
|
||||
if (!current_context->cur_task->_task_specific_data)
|
||||
current_context->cur_task->_task_specific_data = new std::vector<detail::specific_data_info>;
|
||||
set_specific_data(current_context->cur_task->_task_specific_data, slot, new_value, cleanup);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // end namespace fc
|
||||
|
|
@ -12,13 +12,56 @@ BOOST_AUTO_TEST_CASE( leave_mutex_locked )
|
|||
{
|
||||
{
|
||||
fc::mutex test_mutex;
|
||||
fc::future<void> test_task = fc::async([&](){ fc::scoped_lock<fc::mutex> test_lock(test_mutex); for (int i = 0; i < 10; ++i) fc::usleep(fc::seconds(1));});
|
||||
fc::future<void> test_task = fc::async([&](){
|
||||
fc::scoped_lock<fc::mutex> test_lock(test_mutex);
|
||||
for (int i = 0; i < 10; ++i)
|
||||
fc::usleep(fc::seconds(1));
|
||||
}, "test_task");
|
||||
fc::usleep(fc::seconds(3));
|
||||
test_task.cancel_and_wait();
|
||||
test_task.cancel_and_wait("cancel called directly by test");
|
||||
}
|
||||
BOOST_TEST_PASSPOINT();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( cancel_task_blocked_on_mutex)
|
||||
{
|
||||
{
|
||||
fc::mutex test_mutex;
|
||||
fc::future<void> test_task;
|
||||
{
|
||||
fc::scoped_lock<fc::mutex> test_lock(test_mutex);
|
||||
test_task = fc::async([&test_mutex](){
|
||||
BOOST_TEST_MESSAGE("--- In test_task, locking mutex");
|
||||
fc::scoped_lock<fc::mutex> async_task_test_lock(test_mutex);
|
||||
BOOST_TEST_MESSAGE("--- In test_task, mutex locked, commencing sleep");
|
||||
for (int i = 0; i < 10; ++i)
|
||||
fc::usleep(fc::seconds(1));
|
||||
BOOST_TEST_MESSAGE("--- In test_task, sleeps done, exiting");
|
||||
}, "test_task");
|
||||
fc::usleep(fc::seconds(3));
|
||||
//test_task.cancel();
|
||||
try
|
||||
{
|
||||
test_task.wait(fc::seconds(1));
|
||||
BOOST_ERROR("test should have been canceled, not exited cleanly");
|
||||
}
|
||||
catch (const fc::canceled_exception&)
|
||||
{
|
||||
BOOST_TEST_PASSPOINT();
|
||||
}
|
||||
catch (const fc::timeout_exception&)
|
||||
{
|
||||
BOOST_ERROR("unable to cancel task blocked on mutex");
|
||||
}
|
||||
BOOST_TEST_MESSAGE("Unlocking mutex locked from the main task so the test task will have the opportunity to lock it and be canceled");
|
||||
}
|
||||
fc::usleep(fc::seconds(3));
|
||||
|
||||
test_task.cancel_and_wait("cleaning up test");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
BOOST_AUTO_TEST_CASE( test_non_preemptable_assertion )
|
||||
{
|
||||
return; // this isn't a real test, because the thing it tries to test works by asserting, not by throwing
|
||||
|
|
@ -72,7 +115,7 @@ BOOST_AUTO_TEST_CASE( cancel_an_active_task )
|
|||
fc::usleep(fc::milliseconds(100));
|
||||
|
||||
BOOST_TEST_MESSAGE("Canceling task");
|
||||
task.cancel();
|
||||
task.cancel("canceling to test if cancel works");
|
||||
try
|
||||
{
|
||||
task_result result = task.wait();
|
||||
|
|
@ -106,7 +149,7 @@ BOOST_AUTO_TEST_CASE( cleanup_cancelled_task )
|
|||
BOOST_CHECK_MESSAGE(!weak_string_ptr.expired(), "Weak pointer should still be valid because async task should be holding the strong pointer");
|
||||
fc::usleep(fc::milliseconds(100));
|
||||
BOOST_TEST_MESSAGE("Canceling task");
|
||||
task.cancel();
|
||||
task.cancel("canceling to test if cancel works");
|
||||
try
|
||||
{
|
||||
task.wait();
|
||||
|
|
@ -138,7 +181,7 @@ BOOST_AUTO_TEST_CASE( cancel_scheduled_task )
|
|||
simple_task();
|
||||
simple_task();
|
||||
fc::usleep(fc::seconds(4));
|
||||
simple_task_done.cancel();
|
||||
simple_task_done.cancel("canceling scheduled task to test if cancel works");
|
||||
simple_task_done.wait();
|
||||
}
|
||||
catch ( const fc::exception& e )
|
||||
|
|
|
|||
Loading…
Reference in a new issue