This commit is contained in:
Daniel Larimer 2012-09-10 22:13:31 -05:00
parent 2bb9a4fa38
commit 90f9a1f04d
5 changed files with 31 additions and 14 deletions

View file

@ -4,6 +4,8 @@
#include <fc/future.hpp>
#include <fc/thread.hpp>
#include <fc/log.hpp>
namespace fc {
#if !defined(BOOST_NO_TEMPLATE_ALIASES)
template<typename T>
@ -20,7 +22,8 @@ namespace fc {
inline void wait( boost::signal<void()>& sig, const microseconds& timeout_us=microseconds::max() ) {
promise<void>::ptr p(new promise<void>());
boost::signals::scoped_connection c = sig.connect( [=]() { p->set_value(); } );
boost::signals::scoped_connection c = sig.connect( [=]() { slog( "set value!" );p->set_value(); } );
slog( "wait quit" );
p->wait( timeout_us );
}
}

View file

@ -58,6 +58,7 @@ namespace fc {
async_task(tsk,prio,desc);
return r;
}
void poke();
/**

View file

@ -6,6 +6,8 @@
#include <stdarg.h>
#include <unistd.h>
#include <iostream>
namespace fc {
const char* thread_name();
void* thread_ptr();
@ -31,10 +33,10 @@ namespace fc {
const char* method_name, const char* format, ... ) {
fc::unique_lock<boost::mutex> lock(log_mutex());
if(isatty(fileno(stderr)))
fprintf( stderr, "\r%s", color );
fprintf( stderr, "%p %-15s %-15s %-5zd %-15s ",
thread_ptr(), thread_name(), short_name(file_name), line_num, method_name );
std::cerr<<"\r"<<color;
//fprintf( stderr, "%p %-15s %-15s %-5zd %-15s ",
std::cerr<< thread_ptr()<< thread_name()<< short_name(file_name)<< line_num<< method_name ;
va_list args;
va_start(args,format);
vfprintf( stderr, format, args );

View file

@ -24,7 +24,7 @@ namespace fc {
thread::thread( const char* name ) {
promise<void>::ptr p(new promise<void>());
boost::thread* t = new boost::thread( [this,p]() {
boost::thread* t = new boost::thread( [this,p,name]() {
try {
this->my = new thread_d(*this);
current_thread() = this;
@ -33,6 +33,7 @@ namespace fc {
} catch ( ... ) {
elog( "Caught unhandled exception" );
}
slog( "exiting %s", name );
} );
p->wait();
my->boost_thread = t;
@ -53,7 +54,10 @@ namespace fc {
}
thread::~thread() {
delete my;
if( is_current() ) {
delete my;
my = 0;
}
}
thread& thread::current() {
@ -207,7 +211,14 @@ namespace fc {
async_task( t, p, time_point::max(), desc );
}
void thread::poke() {
boost::unique_lock<boost::mutex> lock(my->task_ready_mutex);
slog("notify one");
my->task_ready.notify_one();
}
void thread::async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ) {
slog( "%s", name().c_str() );
task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
do { t->_next = stale_head;
}while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );
@ -217,6 +228,7 @@ namespace fc {
// when *this thread is about to block on a wait condition.
if( this != &current() && !stale_head ) {
boost::unique_lock<boost::mutex> lock(my->task_ready_mutex);
slog("notify one");
my->task_ready.notify_one();
}
}
@ -282,7 +294,6 @@ namespace fc {
this->async( [=](){ notify(p); } );
return;
}
//debug( "begin notify" );
// TODO: store a list of blocked contexts with the promise
// to accelerate the lookup.... unless it introduces contention...
@ -293,9 +304,7 @@ namespace fc {
fc::context* prev_blocked = 0;
while( cur_blocked ) {
// if the blocked context is waiting on this promise
// slog( "try unblock ctx %1% from prom %2%", cur_blocked, p.get() );
if( cur_blocked->try_unblock( p.get() ) ) {
//slog( "unblock!" );
// remove it from the blocked list.
// remove this context from the sleep queue...
@ -323,9 +332,6 @@ namespace fc {
cur_blocked = cur_blocked->next_blocked;
}
}
//debug( "end notify" );
}
bool thread::is_current()const {
return this == &current();

View file

@ -32,11 +32,14 @@ namespace fc {
name = fc::string("th_") + char('a'+cnt);
cnt++;
}
~thread_d(){
slog( "...%p %s",this,name.c_str() );
}
fc::thread& self;
boost::thread* boost_thread;
bc::stack_allocator stack_alloc;
boost::mutex task_ready_mutex;
boost::condition_variable task_ready;
boost::mutex task_ready_mutex;
boost::atomic<task_base*> task_in_queue;
std::vector<task_base*> task_pqueue;
@ -316,9 +319,11 @@ namespace fc {
if( has_next_task() ) continue;
time_point timeout_time = check_for_timeouts();
if( done ) return;
if( timeout_time == time_point::max() ) {
task_ready.wait( lock );
} else if( timeout_time != time_point::min() ) {
slog("timed wait");
task_ready.wait_until( lock, boost::chrono::system_clock::time_point() +
boost::chrono::microseconds(timeout_time.time_since_epoch().count()) );
}