From cd69d13348da2c4d8f4caf8a275b25141bbec2e6 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Mon, 16 Sep 2019 21:02:25 +0200 Subject: [PATCH] Replaced spin_yield_lock with atomics --- include/fc/thread/future.hpp | 19 ++++---- include/fc/thread/task.hpp | 2 + src/thread/future.cpp | 88 ++++++++++++++---------------------- 3 files changed, 46 insertions(+), 63 deletions(-) diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index 4e98c3d..c0db411 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -3,9 +3,9 @@ #include #include #include -#include -#include +#include +#include //#define FC_TASK_NAMES_ARE_MANDATORY 1 #ifdef FC_TASK_NAMES_ARE_MANDATORY @@ -77,23 +77,22 @@ namespace fc { void _wait( const microseconds& timeout_us ); void _wait_until( const time_point& timeout_us ); - void _enqueue_thread(); - void _dequeue_thread(); void _notify(); - void _set_timeout(); void _set_value(const void* v); void _on_complete( detail::completion_handler* c ); private: + void _enqueue_thread(); + void _dequeue_thread(); + friend class thread; friend struct context; friend class thread_d; - bool _ready; - mutable spin_yield_lock _spin_yield; - thread* _blocked_thread; - unsigned _blocked_fiber_count; + std::atomic _ready; + std::atomic _blocked_thread; + std::atomic _blocked_fiber_count; time_point _timeout; fc::exception_ptr _exceptp; bool _canceled; @@ -103,7 +102,7 @@ namespace fc { private: #endif const char* _desc; - detail::completion_handler* _compl; + std::atomic _compl; }; template diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index f7c2ec5..92e1b48 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -4,6 +4,8 @@ #include #include +#include + namespace fc { struct context; class spin_lock; diff --git a/src/thread/future.cpp b/src/thread/future.cpp index c9dbffa..811378c 100644 --- a/src/thread/future.cpp +++ b/src/thread/future.cpp @@ -6,7 +6,6 @@ #include - namespace fc { promise_base::promise_base( const char* desc ) @@ -22,6 +21,8 @@ namespace fc { _compl(nullptr) { } + promise_base::~promise_base() { } + const char* promise_base::get_desc()const{ return _desc; } @@ -34,16 +35,14 @@ namespace fc { #endif } bool promise_base::ready()const { - return _ready; + return _ready.load(); } bool promise_base::error()const { - { synchronized(_spin_yield) - return _exceptp != nullptr; - } + return std::atomic_load( &_exceptp ) != nullptr; } void promise_base::set_exception( const fc::exception_ptr& e ){ - _exceptp = e; + std::atomic_store( &_exceptp, e ); _set_value(nullptr); } @@ -54,16 +53,17 @@ namespace fc { _wait_until( time_point::now() + timeout_us ); } void promise_base::_wait_until( const time_point& timeout_us ){ - { synchronized(_spin_yield) - if( _ready ) { - if( _exceptp ) - _exceptp->dynamic_rethrow_exception(); - return; - } - _enqueue_thread(); + if( _ready.load() ) { + fc::exception_ptr ex = std::atomic_load( &_exceptp ); + if( ex ) + ex->dynamic_rethrow_exception(); + return; } - std::exception_ptr e; + _enqueue_thread(); + // Need to check _ready again to avoid a race condition. + if( _ready.load() ) return _wait_until( timeout_us ); // this will simply return or throw _exceptp + std::exception_ptr e; // // Create shared_ptr to take ownership of this; i.e. this will // be deleted when p_this goes out of scope. Consequently, @@ -71,9 +71,7 @@ namespace fc { // before we're done reading/writing instance variables! // See https://github.com/cryptonomex/graphene/issues/597 // - ptr p_this = shared_from_this(); - try { // @@ -94,61 +92,45 @@ namespace fc { if( e ) std::rethrow_exception(e); - if( _ready ) - { - if( _exceptp ) - _exceptp->dynamic_rethrow_exception(); - return; - } + if( _ready.load() ) return _wait_until( timeout_us ); // this will simply return or throw _exceptp + FC_THROW_EXCEPTION( timeout_exception, "" ); } void promise_base::_enqueue_thread(){ - ++_blocked_fiber_count; + _blocked_fiber_count.fetch_add( 1 ); + thread* blocked_thread = _blocked_thread.load(); // only one thread can wait on a promise at any given time - assert(!_blocked_thread || - _blocked_thread == &thread::current()); - _blocked_thread = &thread::current(); + do + assert( !blocked_thread || blocked_thread == &thread::current() ); + while( !_blocked_thread.compare_exchange_weak( blocked_thread, &thread::current() ) ); } void promise_base::_dequeue_thread(){ - synchronized(_spin_yield) - if (!--_blocked_fiber_count) - _blocked_thread = nullptr; + if( _blocked_fiber_count.fetch_add( -1 ) == 1 ) + _blocked_thread.store( nullptr ); } void promise_base::_notify(){ // copy _blocked_thread into a local so that if the thread unblocks (e.g., // because of a timeout) before we get a chance to notify it, we won't be // calling notify on a null pointer - thread* blocked_thread; - { synchronized(_spin_yield) - blocked_thread = _blocked_thread; - } + thread* blocked_thread = _blocked_thread.load(); if( blocked_thread ) blocked_thread->notify( shared_from_this() ); } - promise_base::~promise_base() { } - void promise_base::_set_timeout(){ - if( _ready ) - return; - set_exception( std::make_shared() ); - } + void promise_base::_set_value(const void* s){ - // slog( "%p == %d", &_ready, int(_ready)); -// BOOST_ASSERT( !_ready ); - { synchronized(_spin_yield) - if (_ready) //don't allow promise to be set more than once + bool ready = false; + if( !_ready.compare_exchange_strong( ready, true ) ) //don't allow promise to be set more than once return; - _ready = true; - } - _notify(); - if( nullptr != _compl ) { - _compl->on_complete(s,_exceptp); - } + _notify(); + auto* hdl = _compl.load(); + if( nullptr != hdl ) + hdl->on_complete( s, std::atomic_load( &_exceptp ) ); } + void promise_base::_on_complete( detail::completion_handler* c ) { - { synchronized(_spin_yield) - delete _compl; - _compl = c; - } + auto* hdl = _compl.load(); + while( !_compl.compare_exchange_weak( hdl, c ) ); + delete hdl; } }