diff --git a/include/fc/signals.hpp b/include/fc/signals.hpp index 990a3ea..99d262c 100644 --- a/include/fc/signals.hpp +++ b/include/fc/signals.hpp @@ -4,6 +4,8 @@ #include #include +#include + namespace fc { #if !defined(BOOST_NO_TEMPLATE_ALIASES) template @@ -20,7 +22,8 @@ namespace fc { inline void wait( boost::signal& sig, const microseconds& timeout_us=microseconds::max() ) { promise::ptr p(new promise()); - boost::signals::scoped_connection c = sig.connect( [=]() { p->set_value(); } ); + boost::signals::scoped_connection c = sig.connect( [=]() { slog( "set value!" );p->set_value(); } ); + slog( "wait quit" ); p->wait( timeout_us ); } } diff --git a/include/fc/thread.hpp b/include/fc/thread.hpp index 3d23585..cdd27a7 100644 --- a/include/fc/thread.hpp +++ b/include/fc/thread.hpp @@ -58,6 +58,7 @@ namespace fc { async_task(tsk,prio,desc); return r; } + void poke(); /** diff --git a/src/log.cpp b/src/log.cpp index 9f6c5fe..90efa4f 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -6,6 +6,8 @@ #include #include +#include + namespace fc { const char* thread_name(); void* thread_ptr(); @@ -31,10 +33,10 @@ namespace fc { const char* method_name, const char* format, ... ) { fc::unique_lock lock(log_mutex()); if(isatty(fileno(stderr))) - fprintf( stderr, "\r%s", color ); - - fprintf( stderr, "%p %-15s %-15s %-5zd %-15s ", - thread_ptr(), thread_name(), short_name(file_name), line_num, method_name ); + std::cerr<<"\r"<::ptr p(new promise()); - boost::thread* t = new boost::thread( [this,p]() { + boost::thread* t = new boost::thread( [this,p,name]() { try { this->my = new thread_d(*this); current_thread() = this; @@ -33,6 +33,7 @@ namespace fc { } catch ( ... ) { elog( "Caught unhandled exception" ); } + slog( "exiting %s", name ); } ); p->wait(); my->boost_thread = t; @@ -53,7 +54,10 @@ namespace fc { } thread::~thread() { - delete my; + if( is_current() ) { + delete my; + my = 0; + } } thread& thread::current() { @@ -207,7 +211,14 @@ namespace fc { async_task( t, p, time_point::max(), desc ); } + void thread::poke() { + boost::unique_lock lock(my->task_ready_mutex); + slog("notify one"); + my->task_ready.notify_one(); + } + void thread::async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ) { + slog( "%s", name().c_str() ); task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed); do { t->_next = stale_head; }while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) ); @@ -217,6 +228,7 @@ namespace fc { // when *this thread is about to block on a wait condition. if( this != ¤t() && !stale_head ) { boost::unique_lock lock(my->task_ready_mutex); + slog("notify one"); my->task_ready.notify_one(); } } @@ -282,7 +294,6 @@ namespace fc { this->async( [=](){ notify(p); } ); return; } - //debug( "begin notify" ); // TODO: store a list of blocked contexts with the promise // to accelerate the lookup.... unless it introduces contention... @@ -293,9 +304,7 @@ namespace fc { fc::context* prev_blocked = 0; while( cur_blocked ) { // if the blocked context is waiting on this promise - // slog( "try unblock ctx %1% from prom %2%", cur_blocked, p.get() ); if( cur_blocked->try_unblock( p.get() ) ) { - //slog( "unblock!" ); // remove it from the blocked list. // remove this context from the sleep queue... @@ -323,9 +332,6 @@ namespace fc { cur_blocked = cur_blocked->next_blocked; } } - //debug( "end notify" ); - - } bool thread::is_current()const { return this == ¤t(); diff --git a/src/thread_d.hpp b/src/thread_d.hpp index 8fa8dc0..815b633 100644 --- a/src/thread_d.hpp +++ b/src/thread_d.hpp @@ -32,11 +32,14 @@ namespace fc { name = fc::string("th_") + char('a'+cnt); cnt++; } + ~thread_d(){ + slog( "...%p %s",this,name.c_str() ); + } fc::thread& self; boost::thread* boost_thread; bc::stack_allocator stack_alloc; - boost::mutex task_ready_mutex; boost::condition_variable task_ready; + boost::mutex task_ready_mutex; boost::atomic task_in_queue; std::vector task_pqueue; @@ -316,9 +319,11 @@ namespace fc { if( has_next_task() ) continue; time_point timeout_time = check_for_timeouts(); + if( done ) return; if( timeout_time == time_point::max() ) { task_ready.wait( lock ); } else if( timeout_time != time_point::min() ) { + slog("timed wait"); task_ready.wait_until( lock, boost::chrono::system_clock::time_point() + boost::chrono::microseconds(timeout_time.time_since_epoch().count()) ); }