FC Updates from BitShares and myself #21

Closed
nathanielhourt wants to merge 687 commits from dapp-support into latest-fc
3 changed files with 46 additions and 63 deletions
Showing only changes of commit cd69d13348 - Show all commits

View file

@ -3,9 +3,9 @@
#include <fc/exception/exception.hpp> #include <fc/exception/exception.hpp>
#include <fc/thread/spin_yield_lock.hpp> #include <fc/thread/spin_yield_lock.hpp>
#include <fc/optional.hpp> #include <fc/optional.hpp>
#include <memory>
#include <boost/atomic.hpp> #include <atomic>
#include <memory>
//#define FC_TASK_NAMES_ARE_MANDATORY 1 //#define FC_TASK_NAMES_ARE_MANDATORY 1
#ifdef FC_TASK_NAMES_ARE_MANDATORY #ifdef FC_TASK_NAMES_ARE_MANDATORY
@ -77,23 +77,22 @@ namespace fc {
void _wait( const microseconds& timeout_us ); void _wait( const microseconds& timeout_us );
void _wait_until( const time_point& timeout_us ); void _wait_until( const time_point& timeout_us );
void _enqueue_thread();
void _dequeue_thread();
void _notify(); void _notify();
void _set_timeout();
void _set_value(const void* v); void _set_value(const void* v);
void _on_complete( detail::completion_handler* c ); void _on_complete( detail::completion_handler* c );
private: private:
void _enqueue_thread();
void _dequeue_thread();
friend class thread; friend class thread;
friend struct context; friend struct context;
friend class thread_d; friend class thread_d;
bool _ready; std::atomic<bool> _ready;
mutable spin_yield_lock _spin_yield; std::atomic<thread*> _blocked_thread;
thread* _blocked_thread; std::atomic<int32_t> _blocked_fiber_count;
unsigned _blocked_fiber_count;
time_point _timeout; time_point _timeout;
fc::exception_ptr _exceptp; fc::exception_ptr _exceptp;
bool _canceled; bool _canceled;
@ -103,7 +102,7 @@ namespace fc {
private: private:
#endif #endif
const char* _desc; const char* _desc;
detail::completion_handler* _compl; std::atomic<detail::completion_handler*> _compl;
}; };
template<typename T = void> template<typename T = void>

View file

@ -4,6 +4,8 @@
#include <fc/fwd.hpp> #include <fc/fwd.hpp>
#include <type_traits> #include <type_traits>
#include <boost/atomic.hpp>
namespace fc { namespace fc {
struct context; struct context;
class spin_lock; class spin_lock;

View file

@ -6,7 +6,6 @@
#include <boost/assert.hpp> #include <boost/assert.hpp>
namespace fc { namespace fc {
promise_base::promise_base( const char* desc ) promise_base::promise_base( const char* desc )
@ -22,6 +21,8 @@ namespace fc {
_compl(nullptr) _compl(nullptr)
{ } { }
promise_base::~promise_base() { }
const char* promise_base::get_desc()const{ const char* promise_base::get_desc()const{
return _desc; return _desc;
} }
@ -34,16 +35,14 @@ namespace fc {
#endif #endif
} }
bool promise_base::ready()const { bool promise_base::ready()const {
return _ready; return _ready.load();
} }
bool promise_base::error()const { bool promise_base::error()const {
{ synchronized(_spin_yield) return std::atomic_load( &_exceptp ) != nullptr;
return _exceptp != nullptr;
}
} }
void promise_base::set_exception( const fc::exception_ptr& e ){ void promise_base::set_exception( const fc::exception_ptr& e ){
_exceptp = e; std::atomic_store( &_exceptp, e );
_set_value(nullptr); _set_value(nullptr);
} }
@ -54,16 +53,17 @@ namespace fc {
_wait_until( time_point::now() + timeout_us ); _wait_until( time_point::now() + timeout_us );
} }
void promise_base::_wait_until( const time_point& timeout_us ){ void promise_base::_wait_until( const time_point& timeout_us ){
{ synchronized(_spin_yield) if( _ready.load() ) {
if( _ready ) { fc::exception_ptr ex = std::atomic_load( &_exceptp );
if( _exceptp ) if( ex )
_exceptp->dynamic_rethrow_exception(); ex->dynamic_rethrow_exception();
return; return;
}
_enqueue_thread();
} }
std::exception_ptr e; _enqueue_thread();
// Need to check _ready again to avoid a race condition.
if( _ready.load() ) return _wait_until( timeout_us ); // this will simply return or throw _exceptp
std::exception_ptr e;
// //
// Create shared_ptr to take ownership of this; i.e. this will // Create shared_ptr to take ownership of this; i.e. this will
// be deleted when p_this goes out of scope. Consequently, // be deleted when p_this goes out of scope. Consequently,
@ -71,9 +71,7 @@ namespace fc {
// before we're done reading/writing instance variables! // before we're done reading/writing instance variables!
// See https://github.com/cryptonomex/graphene/issues/597 // See https://github.com/cryptonomex/graphene/issues/597
// //
ptr p_this = shared_from_this(); ptr p_this = shared_from_this();
try try
{ {
// //
@ -94,61 +92,45 @@ namespace fc {
if( e ) std::rethrow_exception(e); if( e ) std::rethrow_exception(e);
if( _ready ) if( _ready.load() ) return _wait_until( timeout_us ); // this will simply return or throw _exceptp
{
if( _exceptp )
_exceptp->dynamic_rethrow_exception();
return;
}
FC_THROW_EXCEPTION( timeout_exception, "" ); FC_THROW_EXCEPTION( timeout_exception, "" );
} }
void promise_base::_enqueue_thread(){ void promise_base::_enqueue_thread(){
++_blocked_fiber_count; _blocked_fiber_count.fetch_add( 1 );
thread* blocked_thread = _blocked_thread.load();
// only one thread can wait on a promise at any given time // only one thread can wait on a promise at any given time
assert(!_blocked_thread || do
_blocked_thread == &thread::current()); assert( !blocked_thread || blocked_thread == &thread::current() );
_blocked_thread = &thread::current(); while( !_blocked_thread.compare_exchange_weak( blocked_thread, &thread::current() ) );
} }
void promise_base::_dequeue_thread(){ void promise_base::_dequeue_thread(){
synchronized(_spin_yield) if( _blocked_fiber_count.fetch_add( -1 ) == 1 )
if (!--_blocked_fiber_count) _blocked_thread.store( nullptr );
_blocked_thread = nullptr;
} }
void promise_base::_notify(){ void promise_base::_notify(){
// copy _blocked_thread into a local so that if the thread unblocks (e.g., // copy _blocked_thread into a local so that if the thread unblocks (e.g.,
// because of a timeout) before we get a chance to notify it, we won't be // because of a timeout) before we get a chance to notify it, we won't be
// calling notify on a null pointer // calling notify on a null pointer
thread* blocked_thread; thread* blocked_thread = _blocked_thread.load();
{ synchronized(_spin_yield)
blocked_thread = _blocked_thread;
}
if( blocked_thread ) if( blocked_thread )
blocked_thread->notify( shared_from_this() ); blocked_thread->notify( shared_from_this() );
} }
promise_base::~promise_base() { }
void promise_base::_set_timeout(){
if( _ready )
return;
set_exception( std::make_shared<fc::timeout_exception>() );
}
void promise_base::_set_value(const void* s){ void promise_base::_set_value(const void* s){
// slog( "%p == %d", &_ready, int(_ready)); bool ready = false;
// BOOST_ASSERT( !_ready ); if( !_ready.compare_exchange_strong( ready, true ) ) //don't allow promise to be set more than once
{ synchronized(_spin_yield)
if (_ready) //don't allow promise to be set more than once
return; return;
_ready = true; _notify();
} auto* hdl = _compl.load();
_notify(); if( nullptr != hdl )
if( nullptr != _compl ) { hdl->on_complete( s, std::atomic_load( &_exceptp ) );
_compl->on_complete(s,_exceptp);
}
} }
void promise_base::_on_complete( detail::completion_handler* c ) { void promise_base::_on_complete( detail::completion_handler* c ) {
{ synchronized(_spin_yield) auto* hdl = _compl.load();
delete _compl; while( !_compl.compare_exchange_weak( hdl, c ) );
_compl = c; delete hdl;
}
} }
} }