Merge branch 'master' of github.com:InvictusInnovations/fc

This commit is contained in:
Nathan Hourt 2014-10-17 13:43:46 -04:00
commit fadc0512a1
7 changed files with 398 additions and 275 deletions

View file

@ -15,6 +15,7 @@ namespace fc {
} }
static priority max() { return priority(10000); } static priority max() { return priority(10000); }
static priority min() { return priority(-10000); } static priority min() { return priority(-10000); }
static priority _internal__priority_for_short_sleeps() { return priority(-100000); }
int value; int value;
}; };
} }

View file

@ -51,7 +51,7 @@ namespace fc
{ {
uint64_t ntp_timestamp_host = bswap_64(ntp_timestamp_net_order); uint64_t ntp_timestamp_host = bswap_64(ntp_timestamp_net_order);
uint32_t fractional_seconds = ntp_timestamp_host & 0xffffffff; uint32_t fractional_seconds = ntp_timestamp_host & 0xffffffff;
uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (UINT64_C(1) << 31)) >> 32); uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (uint64_t(1) << 31)) >> 32);
uint32_t seconds_since_1900 = ntp_timestamp_host >> 32; uint32_t seconds_since_1900 = ntp_timestamp_host >> 32;
uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800;
return fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds); return fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds);
@ -63,7 +63,7 @@ namespace fc
uint32_t seconds_since_epoch = (uint32_t)(microseconds_since_epoch / 1000000); uint32_t seconds_since_epoch = (uint32_t)(microseconds_since_epoch / 1000000);
uint32_t seconds_since_1900 = seconds_since_epoch + 2208988800; uint32_t seconds_since_1900 = seconds_since_epoch + 2208988800;
uint32_t microseconds = microseconds_since_epoch % 1000000; uint32_t microseconds = microseconds_since_epoch % 1000000;
uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (UINT64_C(1) << 31)) / 1000000); uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (uint64_t(1) << 31)) / 1000000);
uint64_t ntp_timestamp_net_order = ((uint64_t)seconds_since_1900 << 32) + fractional_seconds; uint64_t ntp_timestamp_net_order = ((uint64_t)seconds_since_1900 << 32) + fractional_seconds;
return bswap_64(ntp_timestamp_net_order); return bswap_64(ntp_timestamp_net_order);
} }

View file

@ -2,8 +2,9 @@
#include <fc/crypto/bigint.hpp> #include <fc/crypto/bigint.hpp>
#include <fc/exception/exception.hpp> #include <fc/exception/exception.hpp>
#include <sstream> #include <sstream>
#include <stdint.h>
#define PRECISION (1000000ll * 1000000ll * 1000000ll) #define PRECISION (uint64_t(1000000) * uint64_t(1000000) * uint64_t(1000000))
namespace fc namespace fc
{ {

View file

@ -60,7 +60,8 @@ namespace fc {
cancellation_reason(nullptr), cancellation_reason(nullptr),
#endif #endif
complete(false), complete(false),
cur_task(0) cur_task(0),
context_posted_num(0)
{ {
#if BOOST_VERSION >= 105600 #if BOOST_VERSION >= 105600
size_t stack_size = stack_allocator::traits_type::default_size() * 4; size_t stack_size = stack_allocator::traits_type::default_size() * 4;
@ -99,7 +100,8 @@ namespace fc {
cancellation_reason(nullptr), cancellation_reason(nullptr),
#endif #endif
complete(false), complete(false),
cur_task(0) cur_task(0),
context_posted_num(0)
{} {}
~context() { ~context() {
@ -229,6 +231,7 @@ namespace fc {
#endif #endif
bool complete; bool complete;
task_base* cur_task; task_base* cur_task;
uint64_t context_posted_num; // serial number set each tiem the context is added to the ready list
}; };
} // naemspace fc } // naemspace fc

View file

@ -173,7 +173,7 @@ namespace fc {
// move all sleep tasks to ready // move all sleep tasks to ready
for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) {
my->ready_push_front( my->sleep_pqueue[i] ); my->add_context_to_ready_list( my->sleep_pqueue[i] );
} }
my->sleep_pqueue.clear(); my->sleep_pqueue.clear();
@ -182,21 +182,19 @@ namespace fc {
while( cur ) { while( cur ) {
fc::context* n = cur->next; fc::context* n = cur->next;
cur->next = 0; cur->next = 0;
my->ready_push_front( cur ); my->add_context_to_ready_list( cur );
cur = n; cur = n;
} }
// mark all ready tasks (should be everyone)... as canceled // mark all ready tasks (should be everyone)... as canceled
cur = my->ready_head; for (fc::context* ready_context : my->ready_heap)
while( cur ) { ready_context->canceled = true;
cur->canceled = true;
cur = cur->next;
}
my->done = true; my->done = true;
// now that we have poked all fibers... switch to the next one and // now that we have poked all fibers... switch to the next one and
// let them all quit. // let them all quit.
while( my->ready_head ) { while (!my->ready_heap.empty())
{
my->start_next_fiber(true); my->start_next_fiber(true);
my->check_for_timeouts(); my->check_for_timeouts();
} }
@ -204,9 +202,13 @@ namespace fc {
my->cleanup_thread_specific_data(); my->cleanup_thread_specific_data();
} }
void thread::exec() { void thread::exec()
if( !my->current ) my->current = new fc::context(&fc::thread::current()); {
try { if( !my->current )
my->current = new fc::context(&fc::thread::current());
try
{
my->process_tasks(); my->process_tasks();
} }
catch( canceled_exception& e ) catch( canceled_exception& e )
@ -217,71 +219,75 @@ namespace fc {
my->current = 0; my->current = 0;
} }
bool thread::is_running()const { bool thread::is_running()const
{
return !my->done; return !my->done;
} }
priority thread::current_priority()const { priority thread::current_priority()const
{
BOOST_ASSERT(my); BOOST_ASSERT(my);
if( my->current ) return my->current->prio; if( my->current )
return my->current->prio;
return priority(); return priority();
} }
void thread::yield(bool reschedule ) { void thread::yield(bool reschedule)
{
my->check_fiber_exceptions(); my->check_fiber_exceptions();
my->start_next_fiber(reschedule); my->start_next_fiber(reschedule);
my->check_fiber_exceptions(); my->check_fiber_exceptions();
} }
void thread::sleep_until( const time_point& tp ) { void thread::sleep_until( const time_point& tp )
if( tp <= (time_point::now()+fc::microseconds(10000)) )
{ {
this->yield(true); if( tp <= (time_point::now()+fc::microseconds(10000)) )
} yield(true);
my->yield_until( tp, false ); my->yield_until( tp, false );
} }
int thread::wait_any_until( std::vector<promise_base::ptr>&& p, const time_point& timeout) { int thread::wait_any_until( std::vector<promise_base::ptr>&& p, const time_point& timeout) {
for( size_t i = 0; i < p.size(); ++i ) { for( size_t i = 0; i < p.size(); ++i )
if( p[i]->ready() ) return i; if( p[i]->ready() )
} return i;
if( timeout < time_point::now() ) { if( timeout < time_point::now() )
{
fc::stringstream ss; fc::stringstream ss;
for( auto i = p.begin(); i != p.end(); ++i ) { for( auto i = p.begin(); i != p.end(); ++i )
ss << (*i)->get_desc() << ", "; ss << (*i)->get_desc() << ", ";
}
FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) ); FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task",ss.str()) );
} }
if( !my->current ) { if( !my->current )
my->current = new fc::context(&fc::thread::current()); my->current = new fc::context(&fc::thread::current());
}
for( uint32_t i = 0; i < p.size(); ++i ) { for( uint32_t i = 0; i < p.size(); ++i )
my->current->add_blocking_promise(p[i].get(),false); my->current->add_blocking_promise(p[i].get(),false);
};
// if not max timeout, added to sleep pqueue // if not max timeout, added to sleep pqueue
if( timeout != time_point::maximum() ) { if( timeout != time_point::maximum() )
{
my->current->resume_time = timeout; my->current->resume_time = timeout;
my->sleep_pqueue.push_back(my->current); my->sleep_pqueue.push_back(my->current);
std::push_heap( my->sleep_pqueue.begin(), std::push_heap( my->sleep_pqueue.begin(),
my->sleep_pqueue.end(), my->sleep_pqueue.end(),
sleep_priority_less() ); sleep_priority_less() );
} }
my->add_to_blocked( my->current ); my->add_to_blocked( my->current );
my->start_next_fiber(); my->start_next_fiber();
for( auto i = p.begin(); i != p.end(); ++i ) { for( auto i = p.begin(); i != p.end(); ++i )
my->current->remove_blocking_promise(i->get()); my->current->remove_blocking_promise(i->get());
}
my->check_fiber_exceptions(); my->check_fiber_exceptions();
for( uint32_t i = 0; i < p.size(); ++i ) { for( uint32_t i = 0; i < p.size(); ++i )
if( p[i]->ready() ) return i; if( p[i]->ready() )
} return i;
//BOOST_THROW_EXCEPTION( wait_any_error() ); //BOOST_THROW_EXCEPTION( wait_any_error() );
return -1; return -1;
} }
@ -323,30 +329,38 @@ namespace fc {
thread::current().sleep_until(tp); thread::current().sleep_until(tp);
} }
void exec() { void exec()
{
return thread::current().exec(); return thread::current().exec();
} }
int wait_any( std::vector<promise_base::ptr>&& v, const microseconds& timeout_us ) { int wait_any( std::vector<promise_base::ptr>&& v, const microseconds& timeout_us )
{
return thread::current().wait_any_until( fc::move(v), time_point::now() + timeout_us ); return thread::current().wait_any_until( fc::move(v), time_point::now() + timeout_us );
} }
int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& tp ) {
int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& tp )
{
return thread::current().wait_any_until( fc::move(v), tp ); return thread::current().wait_any_until( fc::move(v), tp );
} }
void thread::wait_until( promise_base::ptr&& p, const time_point& timeout ) {
if( p->ready() ) return; void thread::wait_until( promise_base::ptr&& p, const time_point& timeout )
{
if( p->ready() )
return;
if( timeout < time_point::now() ) if( timeout < time_point::now() )
FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) ); FC_THROW_EXCEPTION( timeout_exception, "${task}", ("task", p->get_desc()) );
if( !my->current ) { if( !my->current )
my->current = new fc::context(&fc::thread::current()); my->current = new fc::context(&fc::thread::current());
}
//slog( " %1% blocking on %2%", my->current, p.get() ); //slog( " %1% blocking on %2%", my->current, p.get() );
my->current->add_blocking_promise(p.get(), true); my->current->add_blocking_promise(p.get(), true);
// if not max timeout, added to sleep pqueue // if not max timeout, added to sleep pqueue
if( timeout != time_point::maximum() ) { if( timeout != time_point::maximum() )
{
my->current->resume_time = timeout; my->current->resume_time = timeout;
my->sleep_pqueue.push_back(my->current); my->sleep_pqueue.push_back(my->current);
std::push_heap( my->sleep_pqueue.begin(), std::push_heap( my->sleep_pqueue.begin(),
@ -368,10 +382,12 @@ namespace fc {
my->check_fiber_exceptions(); my->check_fiber_exceptions();
} }
void thread::notify( const promise_base::ptr& p ) { void thread::notify( const promise_base::ptr& p )
{
//slog( "this %p my %p", this, my ); //slog( "this %p my %p", this, my );
BOOST_ASSERT(p->ready()); BOOST_ASSERT(p->ready());
if( !is_current() ) { if( !is_current() )
{
this->async( [=](){ notify(p); }, "notify", priority::max() ); this->async( [=](){ notify(p); }, "notify", priority::max() );
return; return;
} }
@ -383,14 +399,18 @@ namespace fc {
fc::context* cur_blocked = my->blocked; fc::context* cur_blocked = my->blocked;
fc::context* prev_blocked = 0; fc::context* prev_blocked = 0;
while( cur_blocked ) { while( cur_blocked )
{
// if the blocked context is waiting on this promise // if the blocked context is waiting on this promise
if( cur_blocked->try_unblock( p.get() ) ) { if( cur_blocked->try_unblock( p.get() ) )
{
// remove it from the blocked list. // remove it from the blocked list.
// remove this context from the sleep queue... // remove this context from the sleep queue...
for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i ) { for( uint32_t i = 0; i < my->sleep_pqueue.size(); ++i )
if( my->sleep_pqueue[i] == cur_blocked ) { {
if( my->sleep_pqueue[i] == cur_blocked )
{
my->sleep_pqueue[i]->blocking_prom.clear(); my->sleep_pqueue[i]->blocking_prom.clear();
my->sleep_pqueue[i] = my->sleep_pqueue.back(); my->sleep_pqueue[i] = my->sleep_pqueue.back();
my->sleep_pqueue.pop_back(); my->sleep_pqueue.pop_back();
@ -399,22 +419,29 @@ namespace fc {
} }
} }
auto cur = cur_blocked; auto cur = cur_blocked;
if( prev_blocked ) { if( prev_blocked )
{
prev_blocked->next_blocked = cur_blocked->next_blocked; prev_blocked->next_blocked = cur_blocked->next_blocked;
cur_blocked = prev_blocked->next_blocked; cur_blocked = prev_blocked->next_blocked;
} else { }
else
{
my->blocked = cur_blocked->next_blocked; my->blocked = cur_blocked->next_blocked;
cur_blocked = my->blocked; cur_blocked = my->blocked;
} }
cur->next_blocked = 0; cur->next_blocked = 0;
my->ready_push_front( cur ); my->add_context_to_ready_list( cur );
} else { // goto the next blocked task }
else
{ // goto the next blocked task
prev_blocked = cur_blocked; prev_blocked = cur_blocked;
cur_blocked = cur_blocked->next_blocked; cur_blocked = cur_blocked->next_blocked;
} }
} }
} }
bool thread::is_current()const {
bool thread::is_current()const
{
return this == &current(); return this == &current();
} }

View file

@ -21,12 +21,10 @@ namespace fc {
thread_d(fc::thread& s) thread_d(fc::thread& s)
:self(s), boost_thread(0), :self(s), boost_thread(0),
task_in_queue(0), task_in_queue(0),
next_task_posted_num(1), next_posted_num(1),
done(false), done(false),
current(0), current(0),
pt_head(0), pt_head(0),
ready_head(0),
ready_tail(0),
blocked(0), blocked(0),
next_unused_task_storage_slot(0) next_unused_task_storage_slot(0)
#ifndef NDEBUG #ifndef NDEBUG
@ -37,15 +35,14 @@ namespace fc {
name = fc::string("th_") + char('a'+cnt++); name = fc::string("th_") + char('a'+cnt++);
// printf("thread=%p\n",this); // printf("thread=%p\n",this);
} }
~thread_d(){
~thread_d()
{
delete current; delete current;
fc::context* temp; fc::context* temp;
while (ready_head) for (fc::context* ready_context : ready_heap)
{ delete ready_context;
temp = ready_head->next; ready_heap.clear();
delete ready_head;
ready_head = temp;
}
while (blocked) while (blocked)
{ {
temp = blocked->next; temp = blocked->next;
@ -67,6 +64,7 @@ namespace fc {
delete boost_thread; delete boost_thread;
} }
} }
fc::thread& self; fc::thread& self;
boost::thread* boost_thread; boost::thread* boost_thread;
stack_allocator stack_alloc; stack_allocator stack_alloc;
@ -75,7 +73,7 @@ namespace fc {
boost::atomic<task_base*> task_in_queue; boost::atomic<task_base*> task_in_queue;
std::vector<task_base*> task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time std::vector<task_base*> task_pqueue; // heap of tasks that have never started, ordered by proirity & scheduling time
uint64_t next_task_posted_num; // each task gets assigned a number in the order it is started, tracked here uint64_t next_posted_num; // each task or context gets assigned a number in the order it is ready to execute, tracked here
std::vector<task_base*> task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run std::vector<task_base*> task_sch_queue; // heap of tasks that have never started but are scheduled for a time in the future, ordered by the time they should be run
std::vector<fc::context*> sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume std::vector<fc::context*> sleep_pqueue; // heap of running tasks that have sleeped, ordered by the time they should resume
std::vector<fc::context*> free_list; // list of unused contexts that are ready for deletion std::vector<fc::context*> free_list; // list of unused contexts that are ready for deletion
@ -86,8 +84,7 @@ namespace fc {
fc::context* pt_head; // list of contexts that can be reused for new tasks fc::context* pt_head; // list of contexts that can be reused for new tasks
fc::context* ready_head; // linked list (using 'next') of contexts that are ready to run std::vector<fc::context*> ready_heap; // priority heap of contexts that are ready to run
fc::context* ready_tail;
fc::context* blocked; // linked list of contexts (using 'next_blocked') blocked on promises via wait() fc::context* blocked; // linked list of contexts (using 'next_blocked') blocked on promises via wait()
@ -149,12 +146,14 @@ namespace fc {
} }
#endif #endif
// insert at from of blocked linked list // insert at from of blocked linked list
inline void add_to_blocked( fc::context* c ) { inline void add_to_blocked( fc::context* c )
{
c->next_blocked = blocked; c->next_blocked = blocked;
blocked = c; blocked = c;
} }
void pt_push_back(fc::context* c) { void pt_push_back(fc::context* c)
{
c->next = pt_head; c->next = pt_head;
pt_head = c; pt_head = c;
/* /*
@ -167,46 +166,55 @@ namespace fc {
wlog( "idle context...%2% %1%", c, i ); wlog( "idle context...%2% %1%", c, i );
*/ */
} }
fc::context::ptr ready_pop_front() {
fc::context::ptr tmp = nullptr; fc::context::ptr ready_pop_front()
if( ready_head ) { {
tmp = ready_head; fc::context* highest_priority_context = ready_heap.front();
ready_head = tmp->next; std::pop_heap(ready_heap.begin(), ready_heap.end(), task_priority_less());
if( !ready_head ) ready_heap.pop_back();
ready_tail = nullptr; return highest_priority_context;
tmp->next = nullptr;
} }
return tmp;
void add_context_to_ready_list(context* context_to_add, bool at_end = false)
{
context_to_add->context_posted_num = next_posted_num++;
ready_heap.push_back(context_to_add);
std::push_heap(ready_heap.begin(), ready_heap.end(), task_priority_less());
} }
void ready_push_front( const fc::context::ptr& c ) {
BOOST_ASSERT( c->next == nullptr ); struct task_priority_less
BOOST_ASSERT( c != current ); {
//if( c == current ) wlog( "pushing current to ready??" ); bool operator()(const task_base* a, const task_base* b) const
c->next = ready_head; {
ready_head = c; return a->_prio.value < b->_prio.value ? true :
if( !ready_tail ) (a->_prio.value > b->_prio.value ? false :
ready_tail = c; a->_posted_num > b->_posted_num);
} }
void ready_push_back( const fc::context::ptr& c ) { bool operator()(const task_base* a, const context* b) const
BOOST_ASSERT( c->next == nullptr ); {
BOOST_ASSERT( c != current ); return a->_prio.value < b->prio.value ? true :
//if( c == current ) wlog( "pushing current to ready??" ); (a->_prio.value > b->prio.value ? false :
c->next = 0; a->_posted_num > b->context_posted_num);
if( ready_tail ) {
ready_tail->next = c;
} else {
assert( !ready_head );
ready_head = c;
} }
ready_tail = c; bool operator()(const context* a, const task_base* b) const
{
return a->prio.value < b->_prio.value ? true :
(a->prio.value > b->_prio.value ? false :
a->context_posted_num > b->_posted_num);
} }
struct task_priority_less { bool operator()(const context* a, const context* b) const
bool operator()( task_base* a, task_base* b ) { {
return a->_prio.value < b->_prio.value ? true : (a->_prio.value > b->_prio.value ? false : a->_posted_num > b->_posted_num ); return a->prio.value < b->prio.value ? true :
(a->prio.value > b->prio.value ? false :
a->context_posted_num > b->context_posted_num);
} }
}; };
struct task_when_less {
bool operator()( task_base* a, task_base* b ) { struct task_when_less
{
bool operator()( task_base* a, task_base* b )
{
return a->_when > b->_when; return a->_when > b->_when;
} }
}; };
@ -229,7 +237,7 @@ namespace fc {
} }
cur = t; cur = t;
next_task_posted_num += num_ready_tasks; next_posted_num += num_ready_tasks;
unsigned tasks_posted = 0; unsigned tasks_posted = 0;
while (cur) while (cur)
{ {
@ -241,7 +249,7 @@ namespace fc {
} }
else else
{ {
cur->_posted_num = next_task_posted_num - (++tasks_posted); cur->_posted_num = next_posted_num - (++tasks_posted);
task_pqueue.push_back(cur); task_pqueue.push_back(cur);
std::push_heap(task_pqueue.begin(), std::push_heap(task_pqueue.begin(),
task_pqueue.end(), task_priority_less()); task_pqueue.end(), task_priority_less());
@ -251,31 +259,46 @@ namespace fc {
} }
} }
task_base* dequeue() { void move_newly_scheduled_tasks_to_task_pqueue()
// get a new task {
BOOST_ASSERT(this == thread::current().my); BOOST_ASSERT(this == thread::current().my);
task_base* pending = 0; // first, if there are any new tasks on 'task_in_queue', which is tasks that
// have been just been async or scheduled, but we haven't processed them.
// move them into the task_sch_queue or task_pqueue, as appropriate
//DLN: changed from memory_order_consume for boost 1.55. //DLN: changed from memory_order_consume for boost 1.55.
//This appears to be safest replacement for now, maybe //This appears to be safest replacement for now, maybe
//can be changed to relaxed later, but needs analysis. //can be changed to relaxed later, but needs analysis.
pending = task_in_queue.exchange(0,boost::memory_order_seq_cst); task_base* pending_list = task_in_queue.exchange(0, boost::memory_order_seq_cst);
if( pending ) { enqueue( pending ); } if (pending_list)
enqueue(pending_list);
task_base* p(0); // second, walk through task_sch_queue and move any scheduled tasks that are now
if( task_sch_queue.size() ) { // able to run (because their scheduled time has arrived) to task_pqueue
if( task_sch_queue.front()->_when <= time_point::now() ) {
p = task_sch_queue.front(); while (!task_sch_queue.empty() &&
task_sch_queue.front()->_when <= time_point::now())
{
task_base* ready_task = task_sch_queue.front();
std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less()); std::pop_heap(task_sch_queue.begin(), task_sch_queue.end(), task_when_less());
task_sch_queue.pop_back(); task_sch_queue.pop_back();
return p;
ready_task->_posted_num = next_posted_num++;
task_pqueue.push_back(ready_task);
std::push_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less());
} }
} }
if( task_pqueue.size() ) {
p = task_pqueue.front(); task_base* dequeue()
{
// get a new task
BOOST_ASSERT( this == thread::current().my );
assert(!task_pqueue.empty());
task_base* p = task_pqueue.front();
std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() ); std::pop_heap(task_pqueue.begin(), task_pqueue.end(), task_priority_less() );
task_pqueue.pop_back(); task_pqueue.pop_back();
}
return p; return p;
} }
@ -307,7 +330,8 @@ namespace fc {
* This should be before or after a context switch to * This should be before or after a context switch to
* detect quit/cancel operations and throw an exception. * detect quit/cancel operations and throw an exception.
*/ */
void check_fiber_exceptions() { void check_fiber_exceptions()
{
if( current && current->canceled ) { if( current && current->canceled ) {
#ifdef NDEBUG #ifdef NDEBUG
FC_THROW_EXCEPTION( canceled_exception, "" ); FC_THROW_EXCEPTION( canceled_exception, "" );
@ -326,7 +350,8 @@ namespace fc {
* If none are available then create a new context and * If none are available then create a new context and
* have it wait for something to do. * have it wait for something to do.
*/ */
bool start_next_fiber( bool reschedule = false ) { bool start_next_fiber( bool reschedule = false )
{
/* If this assert fires, it means you are executing an operation that is causing /* If this assert fires, it means you are executing an operation that is causing
* the current task to yield, but there is a ASSERT_TASK_NOT_PREEMPTED() in effect * the current task to yield, but there is a ASSERT_TASK_NOT_PREEMPTED() in effect
* (somewhere up the stack) */ * (somewhere up the stack) */
@ -342,12 +367,17 @@ namespace fc {
assert(std::current_exception() == std::exception_ptr()); assert(std::current_exception() == std::exception_ptr());
check_for_timeouts(); check_for_timeouts();
if( !current ) current = new fc::context( &fc::thread::current() ); if( !current )
current = new fc::context( &fc::thread::current() );
priority original_priority = current->prio;
// check to see if any other contexts are ready // check to see if any other contexts are ready
if( ready_head ) { if (!ready_heap.empty())
{
fc::context* next = ready_pop_front(); fc::context* next = ready_pop_front();
if( next == current ) { if (next == current)
{
// elog( "next == current... something went wrong" ); // elog( "next == current... something went wrong" );
assert(next != current); assert(next != current);
return false; return false;
@ -357,7 +387,11 @@ namespace fc {
// jump to next context, saving current context // jump to next context, saving current context
fc::context* prev = current; fc::context* prev = current;
current = next; current = next;
if( reschedule ) ready_push_back(prev); if (reschedule)
{
current->prio = priority::_internal__priority_for_short_sleeps();
add_context_to_ready_list(prev, true);
}
// slog( "jump to %p from %p", next, prev ); // slog( "jump to %p from %p", next, prev );
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
#if BOOST_VERSION >= 105600 #if BOOST_VERSION >= 105600
@ -370,23 +404,35 @@ namespace fc {
BOOST_ASSERT( current ); BOOST_ASSERT( current );
BOOST_ASSERT( current == prev ); BOOST_ASSERT( current == prev );
//current = prev; //current = prev;
} else { // all contexts are blocked, create a new context }
else
{
// all contexts are blocked, create a new context
// that will process posted tasks... // that will process posted tasks...
fc::context* prev = current; fc::context* prev = current;
fc::context* next = nullptr; fc::context* next = nullptr;
if( pt_head ) { // grab cached context if( pt_head )
{
// grab cached context
next = pt_head; next = pt_head;
pt_head = pt_head->next; pt_head = pt_head->next;
next->next = 0; next->next = 0;
next->reinitialize(); next->reinitialize();
} else { // create new context. }
else
{
// create new context.
next = new fc::context( &thread_d::start_process_tasks, stack_alloc, next = new fc::context( &thread_d::start_process_tasks, stack_alloc,
&fc::thread::current() ); &fc::thread::current() );
} }
current = next; current = next;
if( reschedule ) ready_push_back(prev); if( reschedule )
{
current->prio = priority::_internal__priority_for_short_sleeps();
add_context_to_ready_list(prev, true);
}
// slog( "jump to %p from %p", next, prev ); // slog( "jump to %p from %p", next, prev );
// fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) ); // fc_dlog( logger::get("fc_context"), "from ${from} to ${to}", ( "from", int64_t(prev) )( "to", int64_t(next) ) );
@ -402,7 +448,11 @@ namespace fc {
//current = prev; //current = prev;
} }
if( current->canceled ) { if (reschedule)
current->prio = original_priority;
if( current->canceled )
{
//current->canceled = false; //current->canceled = false;
#ifdef NDEBUG #ifdef NDEBUG
FC_THROW_EXCEPTION( canceled_exception, "" ); FC_THROW_EXCEPTION( canceled_exception, "" );
@ -414,13 +464,19 @@ namespace fc {
return true; return true;
} }
static void start_process_tasks( intptr_t my ) { static void start_process_tasks( intptr_t my )
{
thread_d* self = (thread_d*)my; thread_d* self = (thread_d*)my;
try { try
{
self->process_tasks(); self->process_tasks();
} catch ( canceled_exception& ) { }
catch ( canceled_exception& )
{
// allowed exception... // allowed exception...
} catch ( ... ) { }
catch ( ... )
{
elog( "fiber ${name} exited with uncaught exception: ${e}", ("e",fc::except_str())("name", self->name) ); elog( "fiber ${name} exited with uncaught exception: ${e}", ("e",fc::except_str())("name", self->name) );
// assert( !"fiber exited with uncaught exception" ); // assert( !"fiber exited with uncaught exception" );
//TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<< //TODO replace errror fc::cerr<<"fiber exited with uncaught exception:\n "<<
@ -430,11 +486,10 @@ namespace fc {
self->start_next_fiber( false ); self->start_next_fiber( false );
} }
bool run_next_task() { void run_next_task()
check_for_timeouts(); {
task_base* next = dequeue(); task_base* next = dequeue();
if( next ) {
next->_set_active_context( current ); next->_set_active_context( current );
current->cur_task = next; current->cur_task = next;
next->run(); next->run();
@ -442,48 +497,80 @@ namespace fc {
next->_set_active_context(0); next->_set_active_context(0);
next->release(); next->release();
current->reinitialize(); current->reinitialize();
return true;
} }
return false;
} bool has_next_task()
bool has_next_task() { {
if( task_pqueue.size() || if( task_pqueue.size() ||
(task_sch_queue.size() && task_sch_queue.front()->_when <= time_point::now()) || (task_sch_queue.size() && task_sch_queue.front()->_when <= time_point::now()) ||
task_in_queue.load( boost::memory_order_relaxed ) ) task_in_queue.load( boost::memory_order_relaxed ) )
return true; return true;
return false; return false;
} }
void clear_free_list() {
for( uint32_t i = 0; i < free_list.size(); ++i ) { void clear_free_list()
{
for( uint32_t i = 0; i < free_list.size(); ++i )
delete free_list[i]; delete free_list[i];
}
free_list.clear(); free_list.clear();
} }
void process_tasks() {
while( !done || blocked ) { void process_tasks()
if( run_next_task() ) continue; {
while( !done || blocked )
{
// move all new tasks to the task_pqueue
move_newly_scheduled_tasks_to_task_pqueue();
// move all now-ready sleeping tasks to the ready list
check_for_timeouts();
if (!task_pqueue.empty())
{
if (!ready_heap.empty())
{
// a new task and an existing task are both ready to go
if (task_priority_less()(task_pqueue.front(), ready_heap.front()))
{
// run the existing task first
pt_push_back(current);
start_next_fiber(false);
continue;
}
}
// if we made it here, either there's no ready context, or the ready context is
// scheduled after the ready task, so we should run the task first
run_next_task();
continue;
}
// if I have something else to do other than // if I have something else to do other than
// process tasks... do it. // process tasks... do it.
if( ready_head ) { if (!ready_heap.empty())
{
pt_push_back( current ); pt_push_back( current );
start_next_fiber(false); start_next_fiber(false);
continue; continue;
} }
if( process_canceled_tasks() ) continue; if( process_canceled_tasks() )
continue;
clear_free_list(); clear_free_list();
{ // lock scope { // lock scope
boost::unique_lock<boost::mutex> lock(task_ready_mutex); boost::unique_lock<boost::mutex> lock(task_ready_mutex);
if( has_next_task() ) continue; if( has_next_task() )
continue;
time_point timeout_time = check_for_timeouts(); time_point timeout_time = check_for_timeouts();
if( done ) return; if( done )
if( timeout_time == time_point::maximum() ) { return;
if( timeout_time == time_point::maximum() )
task_ready.wait( lock ); task_ready.wait( lock );
} else if( timeout_time != time_point::min() ) { else if( timeout_time != time_point::min() )
{
// there may be tasks that have been canceled we should filter them out now // there may be tasks that have been canceled we should filter them out now
// rather than waiting... // rather than waiting...
@ -515,20 +602,23 @@ namespace fc {
* Retunn system_clock::time_point::max() if there are no scheduled tasks * Retunn system_clock::time_point::max() if there are no scheduled tasks
* Return the time the next task needs to be run if there is anything scheduled. * Return the time the next task needs to be run if there is anything scheduled.
*/ */
time_point check_for_timeouts() { time_point check_for_timeouts()
if( !sleep_pqueue.size() && !task_sch_queue.size() ) { {
if( !sleep_pqueue.size() && !task_sch_queue.size() )
{
// ilog( "no timeouts ready" ); // ilog( "no timeouts ready" );
return time_point::maximum(); return time_point::maximum();
} }
time_point next = time_point::maximum(); time_point next = time_point::maximum();
if( sleep_pqueue.size() && next > sleep_pqueue.front()->resume_time ) if( !sleep_pqueue.empty() && next > sleep_pqueue.front()->resume_time )
next = sleep_pqueue.front()->resume_time; next = sleep_pqueue.front()->resume_time;
if( task_sch_queue.size() && next > task_sch_queue.front()->_when ) if( !task_sch_queue.empty() && next > task_sch_queue.front()->_when )
next = task_sch_queue.front()->_when; next = task_sch_queue.front()->_when;
time_point now = time_point::now(); time_point now = time_point::now();
if( now < next ) { return next; } if( now < next )
return next;
// move all expired sleeping tasks to the ready queue // move all expired sleeping tasks to the ready queue
while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now ) while( sleep_pqueue.size() && sleep_pqueue.front()->resume_time < now )
@ -540,7 +630,7 @@ namespace fc {
if( c->blocking_prom.size() ) if( c->blocking_prom.size() )
{ {
// ilog( "timeotu blocking prom" ); // ilog( "timeout blocking prom" );
c->timeout_blocking_promises(); c->timeout_blocking_promises();
} }
else else
@ -548,31 +638,32 @@ namespace fc {
// ilog( "..." ); // ilog( "..." );
// ilog( "ready_push_front" ); // ilog( "ready_push_front" );
if (c != current) if (c != current)
ready_push_front( c ); add_context_to_ready_list(c);
} }
} }
return time_point::min(); return time_point::min();
} }
void unblock( fc::context* c ) { void unblock( fc::context* c )
if( fc::thread::current().my != this ) { {
if( fc::thread::current().my != this )
{
self.async( [=](){ unblock(c); }, "thread_d::unblock" ); self.async( [=](){ unblock(c); }, "thread_d::unblock" );
return; return;
} }
if( c != current ) ready_push_front(c);
if (c != current)
add_context_to_ready_list(c);
} }
void yield_until( const time_point& tp, bool reschedule ) { void yield_until( const time_point& tp, bool reschedule ) {
check_fiber_exceptions(); check_fiber_exceptions();
if( tp <= (time_point::now()+fc::microseconds(10000)) ) if( tp <= (time_point::now()+fc::microseconds(10000)) )
{
return; return;
}
if( !current ) { if( !current )
current = new fc::context(&fc::thread::current()); current = new fc::context(&fc::thread::current());
}
current->resume_time = tp; current->resume_time = tp;
current->clear_blocking_promises(); current->clear_blocking_promises();
@ -584,8 +675,10 @@ namespace fc {
start_next_fiber(reschedule); start_next_fiber(reschedule);
// clear current context from sleep queue... // clear current context from sleep queue...
for( uint32_t i = 0; i < sleep_pqueue.size(); ++i ) { for( uint32_t i = 0; i < sleep_pqueue.size(); ++i )
if( sleep_pqueue[i] == current ) { {
if( sleep_pqueue[i] == current )
{
sleep_pqueue[i] = sleep_pqueue.back(); sleep_pqueue[i] = sleep_pqueue.back();
sleep_pqueue.pop_back(); sleep_pqueue.pop_back();
std::make_heap( sleep_pqueue.begin(), std::make_heap( sleep_pqueue.begin(),
@ -599,19 +692,21 @@ namespace fc {
} }
void wait( const promise_base::ptr& p, const time_point& timeout ) { void wait( const promise_base::ptr& p, const time_point& timeout ) {
if( p->ready() ) return; if( p->ready() )
return;
if( timeout < time_point::now() ) if( timeout < time_point::now() )
FC_THROW_EXCEPTION( timeout_exception, "" ); FC_THROW_EXCEPTION( timeout_exception, "" );
if( !current ) { if( !current )
current = new fc::context(&fc::thread::current()); current = new fc::context(&fc::thread::current());
}
// slog( " %1% blocking on %2%", current, p.get() ); // slog( " %1% blocking on %2%", current, p.get() );
current->add_blocking_promise(p.get(),true); current->add_blocking_promise(p.get(),true);
// if not max timeout, added to sleep pqueue // if not max timeout, added to sleep pqueue
if( timeout != time_point::maximum() ) { if( timeout != time_point::maximum() )
{
current->resume_time = timeout; current->resume_time = timeout;
sleep_pqueue.push_back(current); sleep_pqueue.push_back(current);
std::push_heap( sleep_pqueue.begin(), std::push_heap( sleep_pqueue.begin(),
@ -652,7 +747,7 @@ namespace fc {
{ {
fc::context* next_blocked = (*iter)->next_blocked; fc::context* next_blocked = (*iter)->next_blocked;
(*iter)->next_blocked = nullptr; (*iter)->next_blocked = nullptr;
ready_push_front(*iter); add_context_to_ready_list(*iter);
*iter = next_blocked; *iter = next_blocked;
continue; continue;
} }
@ -664,15 +759,10 @@ namespace fc {
{ {
if ((*sleep_iter)->canceled) if ((*sleep_iter)->canceled)
{ {
bool already_on_ready_list = false; bool already_on_ready_list = std::find(ready_heap.begin(), ready_heap.end(),
for (fc::context* ready_iter = ready_head; ready_iter; ready_iter = ready_iter->next) *sleep_iter) != ready_heap.end();
if (ready_iter == *sleep_iter)
{
already_on_ready_list = true;
break;
}
if (!already_on_ready_list) if (!already_on_ready_list)
ready_push_front(*sleep_iter); add_context_to_ready_list(*sleep_iter);
sleep_iter = sleep_pqueue.erase(sleep_iter); sleep_iter = sleep_pqueue.erase(sleep_iter);
task_removed_from_sleep_pqueue = true; task_removed_from_sleep_pqueue = true;
} }

View file

@ -9,6 +9,7 @@
#include <fc/crypto/hex.hpp> #include <fc/crypto/hex.hpp>
#include <boost/scoped_array.hpp> #include <boost/scoped_array.hpp>
#include <fc/reflect/variant.hpp> #include <fc/reflect/variant.hpp>
#include <algorithm>
namespace fc namespace fc
{ {