From 3131c1df430d55626314c82eb23f6ee83b7af44b Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Thu, 27 Sep 2018 15:57:46 +0200 Subject: [PATCH 01/20] Implement helper function for real parallel execution --- include/fc/thread/parallel.hpp | 59 ++++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/thread/parallel_tests.cpp | 75 +++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+) create mode 100644 include/fc/thread/parallel.hpp create mode 100644 tests/thread/parallel_tests.cpp diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp new file mode 100644 index 0000000..eebfaeb --- /dev/null +++ b/include/fc/thread/parallel.hpp @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include +#include + +namespace fc { + + namespace detail { + template + class parallel_completion_handler { + public: + parallel_completion_handler( Task* task ) : _task(task) {} + void operator()() { _task->run(); } + private: + Task* _task; + }; + } + + /** + * Calls function f in a separate thread and returns a future + * that can be used to wait on the result. + * + * @param f the operation to perform + */ + template + auto do_parallel( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG ) -> fc::future { + typedef decltype(f()) Result; + typedef typename fc::deduce::type FunctorType; + fc::task* tsk = + new fc::task( fc::forward(f), desc ); + fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); + fc::asio::default_io_service().post( detail::parallel_completion_handler>( tsk ) ); + return r; + } +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 600f6f8..17c8e5f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -53,6 +53,7 @@ add_executable( all_tests all_tests.cpp network/http/websocket_test.cpp thread/task_cancel.cpp thread/thread_tests.cpp + thread/parallel_tests.cpp bloom_test.cpp real128_test.cpp serialization_test.cpp diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp new file mode 100644 index 0000000..67ec6ae --- /dev/null +++ b/tests/thread/parallel_tests.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include + +#include + +BOOST_AUTO_TEST_SUITE(parallel_tests) + +BOOST_AUTO_TEST_CASE( do_nothing_parallel ) +{ + std::vector> results; + results.reserve( 20 ); + for( size_t i = 0; i < results.capacity(); i++ ) + results.push_back( fc::do_parallel( [i] () { std::cout << i << ","; } ) ); + for( auto& result : results ) + result.wait(); + std::cout << "\n"; +} + +BOOST_AUTO_TEST_CASE( do_something_parallel ) +{ + struct result { + boost::thread::id thread_id; + int call_count; + }; + + std::vector> results; + results.reserve( 20 ); + boost::thread_specific_ptr tls; + for( size_t i = 0; i < results.capacity(); i++ ) + results.push_back( fc::do_parallel( [i,&tls] () { + if( !tls.get() ) { tls.reset( new int(0) ); } + result res = { boost::this_thread::get_id(), (*tls.get())++ }; + return res; + } ) ); + + std::map> results_by_thread; + for( auto& res : results ) + { + result r = res.wait(); + results_by_thread[r.thread_id].push_back( r.call_count ); + } + + BOOST_CHECK( results_by_thread.size() > 1 ); // require execution by more than 1 thread + for( auto& pair : results_by_thread ) + { // check that thread_local_storage counter works + std::sort( pair.second.begin(), pair.second.end() ); + for( size_t i = 0; i < pair.second.size(); i++ ) + BOOST_CHECK_EQUAL( i, pair.second[i] ); + } +} + +BOOST_AUTO_TEST_SUITE_END() From 40b2843d15272b814d5f8950d35c26aa2608ca6a Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Thu, 27 Sep 2018 16:51:04 +0200 Subject: [PATCH 02/20] Added test case for parallel hashing --- include/fc/crypto/sha1.hpp | 5 ++++ tests/thread/parallel_tests.cpp | 50 +++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/include/fc/crypto/sha1.hpp b/include/fc/crypto/sha1.hpp index a269797..01c6ca8 100644 --- a/include/fc/crypto/sha1.hpp +++ b/include/fc/crypto/sha1.hpp @@ -2,6 +2,8 @@ #include #include +#include + namespace fc{ class sha1 @@ -82,3 +84,6 @@ namespace std } }; } + +#include +FC_REFLECT_TYPENAME( fc::sha1 ) diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp index 67ec6ae..b5b56f0 100644 --- a/tests/thread/parallel_tests.cpp +++ b/tests/thread/parallel_tests.cpp @@ -24,7 +24,13 @@ #include +#include +#include +#include +#include +#include #include +#include BOOST_AUTO_TEST_SUITE(parallel_tests) @@ -72,4 +78,48 @@ BOOST_AUTO_TEST_CASE( do_something_parallel ) } } +const std::string TEXT = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"$%&/()=?,.-#+´{[]}`*'_:;<>|"; + +template +class hash_test { + public: + std::string _hashname = fc::get_typename::name(); + + void run_single_threaded() { + const std::string first = Hash::hash(TEXT).str(); + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < 1000; i++ ) + BOOST_CHECK_EQUAL( first, Hash::hash(TEXT).str() ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} single-threaded ${h}'s in ${t}µs", ("c",1000)("h",_hashname)("t",end-start) ); + } + + void run_multi_threaded() { + const std::string first = Hash::hash(TEXT).str(); + std::vector> results; + results.reserve( 10000 ); + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < 10000; i++ ) + results.push_back( fc::do_parallel( [] () { return Hash::hash(TEXT).str(); } ) ); + for( auto& result: results ) + BOOST_CHECK_EQUAL( first, result.wait() ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} multi-threaded ${h}'s in ${t}µs", ("c",10000)("h",_hashname)("t",end-start) ); + } + + void run() { + run_single_threaded(); + run_multi_threaded(); + } +}; + +BOOST_AUTO_TEST_CASE( hash_parallel ) +{ + hash_test().run(); + hash_test().run(); + hash_test().run(); + hash_test().run(); + hash_test().run(); +} + BOOST_AUTO_TEST_SUITE_END() From 6fe8f1d6e6c18c5f4af687da6800320d0a966447 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Thu, 27 Sep 2018 17:24:21 +0200 Subject: [PATCH 03/20] Added parallel sign/verify test --- tests/thread/parallel_tests.cpp | 55 +++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp index b5b56f0..e1ad2b7 100644 --- a/tests/thread/parallel_tests.cpp +++ b/tests/thread/parallel_tests.cpp @@ -24,6 +24,7 @@ #include +#include #include #include #include @@ -122,4 +123,58 @@ BOOST_AUTO_TEST_CASE( hash_parallel ) hash_test().run(); } +BOOST_AUTO_TEST_CASE( sign_verify_parallel ) +{ + const fc::sha256 HASH = fc::sha256::hash(TEXT); + + std::vector keys; + keys.reserve(1000); + for( int i = 0; i < 1000; i++ ) + keys.push_back( fc::ecc::private_key::regenerate( fc::sha256::hash( TEXT + fc::to_string(i) ) ) ); + + std::vector sigs; + sigs.reserve( 10 * keys.size() ); + { + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < 10; i++ ) + for( const auto& key: keys ) + sigs.push_back( key.sign_compact( HASH ) ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} single-threaded signatures in ${t}µs", ("c",sigs.size())("t",end-start) ); + } + + { + fc::time_point start = fc::time_point::now(); + for( size_t i = 0; i < sigs.size(); i++ ) + BOOST_CHECK( keys[i % keys.size()].get_public_key() == fc::ecc::public_key( sigs[i], HASH ) ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} single-threaded verifies in ${t}µs", ("c",sigs.size())("t",end-start) ); + } + + { + std::vector> results; + results.reserve( 10 * keys.size() ); + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < 10; i++ ) + for( const auto& key: keys ) + results.push_back( fc::do_parallel( [&key,&HASH] () { return key.sign_compact( HASH ); } ) ); + for( auto& res : results ) + res.wait(); + fc::time_point end = fc::time_point::now(); + ilog( "${c} multi-threaded signatures in ${t}µs", ("c",sigs.size())("t",end-start) ); + } + + { + std::vector> results; + results.reserve( sigs.size() ); + fc::time_point start = fc::time_point::now(); + for( const auto& sig: sigs ) + results.push_back( fc::do_parallel( [&sig,&HASH] () { return fc::ecc::public_key( sig, HASH ); } ) ); + for( size_t i = 0; i < results.size(); i++ ) + BOOST_CHECK( keys[i % keys.size()].get_public_key() == results[i].wait() ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} multi-threaded verifies in ${t}µs", ("c",sigs.size())("t",end-start) ); + } +} + BOOST_AUTO_TEST_SUITE_END() From 34a2820c90fb88cf048650855b66ddfd654512c1 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Thu, 27 Sep 2018 17:28:45 +0200 Subject: [PATCH 04/20] Added warning --- include/fc/thread/parallel.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp index eebfaeb..3425b85 100644 --- a/include/fc/thread/parallel.hpp +++ b/include/fc/thread/parallel.hpp @@ -27,6 +27,10 @@ #include #include +/* NOTE: the methods in this header are NOT to be mixed up with fc's + * multithreading. Parallel functions MUST NOT call fc::thread::yield NOR + * use fc's mutexes etc.! + */ namespace fc { namespace detail { From ef3d36547e3dd2777f6b11d6aaf6ff206184c850 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Thu, 27 Sep 2018 18:08:00 +0200 Subject: [PATCH 05/20] Removed unused file --- include/fc/container/deque.hpp | 12 ------------ 1 file changed, 12 deletions(-) delete mode 100644 include/fc/container/deque.hpp diff --git a/include/fc/container/deque.hpp b/include/fc/container/deque.hpp deleted file mode 100644 index 6a05dc8..0000000 --- a/include/fc/container/deque.hpp +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include -#include - -namespace fc { - namespace raw { - - - } // namespace raw - -} // namespace fc From 9954a3775c32ad7fdc5aa5000287a5922d162e24 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Mon, 1 Oct 2018 17:31:04 +0200 Subject: [PATCH 06/20] Added constructor for auto-fulfillment --- include/fc/thread/future.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index cc2c317..0d0e2e7 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -21,7 +21,6 @@ #endif namespace fc { - class abstract_thread; struct void_t{}; class priority; class thread; @@ -146,7 +145,9 @@ namespace fc { public: typedef fc::shared_ptr< promise > ptr; promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} - //promise( const void_t& ){ set_value(); } + promise( bool fulfilled, const char* desc FC_TASK_NAME_DEFAULT_ARG ){ + if( fulfilled ) set_value(); + } void wait(const microseconds& timeout = microseconds::maximum() ){ this->_wait( timeout ); From 21724face73fbd76f04012f0c15ea05719a04e75 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Mon, 1 Oct 2018 17:31:48 +0200 Subject: [PATCH 07/20] Give asio threads separate names for better debugging --- include/fc/asio.hpp | 5 ++--- src/asio.cpp | 9 +++------ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index 3ad05b2..a2a0fb7 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -19,7 +19,6 @@ namespace asio { * @brief internal implementation types/methods for fc::asio */ namespace detail { - using namespace fc; class read_write_handler { @@ -59,14 +58,14 @@ namespace asio { bool operator()( C& c, bool s ) { c.non_blocking(s); return true; } }; - #if WIN32 // windows stream handles do not support non blocking! +#if WIN32 // windows stream handles do not support non blocking! template<> struct non_blocking { typedef boost::asio::windows::stream_handle C; bool operator()( C& ) { return false; } bool operator()( C&, bool ) { return false; } }; - #endif +#endif } // end of namespace detail /*** diff --git a/src/asio.cpp b/src/asio.cpp index 909f4a5..cec7de6 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -58,7 +58,6 @@ namespace fc { } else { - //elog( "${message} ", ("message", boost::system::system_error(ec).what())); p->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); } @@ -83,8 +82,6 @@ namespace fc { } p->set_value( eps ); } else { - //elog( "%s", boost::system::system_error(ec).what() ); - //p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); p->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "process exited with: ${message} ", @@ -125,9 +122,9 @@ namespace fc { for( uint16_t i = 0; i < this->num_io_threads; ++i ) { - asio_threads.push_back( new boost::thread( [=]() + asio_threads.push_back( new boost::thread( [i,this]() { - fc::thread::current().set_name("asio"); + fc::thread::current().set_name( "fc::asio worker #" + fc::to_string(i) ); BOOST_SCOPE_EXIT(void) { @@ -194,7 +191,7 @@ namespace fc { promise >::ptr p( new promise >("tcp::resolve completion") ); res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), boost::bind( detail::resolve_handler, p, _1, _2 ) ); - return p->wait();; + return p->wait(); } FC_RETHROW_EXCEPTIONS(warn, "") } From 7e8debbad41b17420535c1faa787127c1db4aed2 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Mon, 1 Oct 2018 17:32:21 +0200 Subject: [PATCH 08/20] Want to see thread names also in RelWithDebInfo build --- src/thread/thread.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 7ca01f2..4da20b3 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -34,7 +34,7 @@ static void set_thread_name(const char* threadName) { } } -#elif defined(__linux__) && !defined(NDEBUG) +#elif defined(__linux__) # include static void set_thread_name(const char* threadName) { From fa7f6af01f37c321385cad5f3b4ac866eb5e8834 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Wed, 3 Oct 2018 10:31:57 +0200 Subject: [PATCH 09/20] Delete broken (wrt fc::current_thread()) move stuff --- include/fc/thread/thread.hpp | 6 +++--- src/thread/thread.cpp | 10 ---------- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index aea55da..933832f 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -22,8 +22,8 @@ namespace fc { class thread { public: thread( const std::string& name = "" ); - thread( thread&& m ); - thread& operator=(thread&& t ); + thread( thread&& m ) = delete; + thread& operator=(thread&& t ) = delete; /** * Returns the current thread. @@ -130,7 +130,7 @@ namespace fc { return wait_any_until(fc::move(proms), fc::time_point::now()+timeout_us ); } private: - thread( class thread_d* ); + thread( class thread_d* ); // parameter is ignored, will create a new thread_d friend class promise_base; friend class task_base; friend class thread_d; diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 4da20b3..dd1dbdc 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -98,16 +98,6 @@ namespace fc { my = new thread_d(*this); } - thread::thread( thread&& m ) { - my = m.my; - m.my = 0; - } - - thread& thread::operator=(thread&& t ) { - fc_swap(t.my,my); - return *this; - } - thread::~thread() { //wlog( "my ${n}", ("n",name()) ); if( my ) From 9d547427417b8ad2cbe7917600a043c89dbc6d0e Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Wed, 3 Oct 2018 10:47:02 +0200 Subject: [PATCH 10/20] Added a mechanism to get notifications when a thread is idle --- include/fc/thread/task.hpp | 2 ++ include/fc/thread/thread.hpp | 20 ++++++++++- src/thread/thread.cpp | 65 ++++++++++++++++-------------------- src/thread/thread_d.hpp | 25 ++++++++++++-- 4 files changed, 72 insertions(+), 40 deletions(-) diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index dd52b77..6c79d71 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -25,6 +25,7 @@ namespace fc { }; void* get_task_specific_data(unsigned slot); void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); + class idle_guard; } class task_base : virtual public promise_base { @@ -53,6 +54,7 @@ namespace fc { // thread/thread_private friend class thread; friend class thread_d; + friend class detail::idle_guard; fwd _spinlock; // avoid rtti info for every possible functor... diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index 933832f..bbd8317 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -12,6 +12,7 @@ namespace fc { namespace detail { + class pool_impl; void* get_thread_specific_data(unsigned slot); void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); unsigned get_next_unused_task_storage_slot(); @@ -19,9 +20,25 @@ namespace fc { void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); } + /** Instances of this class can be used to get notifications when a thread is + * (or is no longer) idle. + */ + class thread_idle_notifier { + public: + /** This method is called when the thread is idle. If it returns a + * task_base it will be queued and executed immediately. + * @return a task to execute, or nullptr + */ + virtual task_base* idle() = 0; + /** This method is called when the thread is no longer idle, e. g. after + * it has woken up due to a timer or signal. + */ + virtual void busy() = 0; + }; + class thread { public: - thread( const std::string& name = "" ); + thread( const std::string& name = "", thread_idle_notifier* notifier = 0 ); thread( thread&& m ) = delete; thread& operator=(thread&& t ) = delete; @@ -135,6 +152,7 @@ namespace fc { friend class task_base; friend class thread_d; friend class mutex; + friend class detail::pool_impl; friend void* detail::get_thread_specific_data(unsigned slot); friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); friend unsigned detail::get_next_unused_task_storage_slot(); diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index dd1dbdc..73fa303 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -62,20 +62,20 @@ namespace fc { } thread*& current_thread() { - #ifdef _MSC_VER +#ifdef _MSC_VER static __declspec(thread) thread* t = NULL; - #else +#else static __thread thread* t = NULL; - #endif +#endif return t; } - thread::thread( const std::string& name ) { + thread::thread( const std::string& name, thread_idle_notifier* notifier ) { promise::ptr p(new promise("thread start")); - boost::thread* t = new boost::thread( [this,p,name]() { + boost::thread* t = new boost::thread( [this,p,name,notifier]() { try { set_thread_name(name.c_str()); // set thread's name for the debugger to display - this->my = new thread_d(*this); + this->my = new thread_d( *this, notifier ); current_thread() = this; p->set_value(); exec(); @@ -85,26 +85,19 @@ namespace fc { } catch ( ... ) { wlog( "unhandled exception" ); p->set_exception( std::make_shared( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); - //assert( !"unhandled exception" ); - //elog( "Caught unhandled exception %s", boost::current_exception_diagnostic_information().c_str() ); } } ); p->wait(); my->boost_thread = t; my->name = name; - //wlog("name:${n} tid:${tid}", ("n", name)("tid", (uintptr_t)my->boost_thread->native_handle()) ); } thread::thread( thread_d* ) { my = new thread_d(*this); } thread::~thread() { - //wlog( "my ${n}", ("n",name()) ); if( my ) - { - // wlog( "calling quit() on ${n}",("n",my->name) ); quit(); - } delete my; } @@ -129,7 +122,7 @@ namespace fc { { if (!is_current()) { - async([=](){ set_name(n); }, "set_name").wait(); + async([this,n](){ set_name(n); }, "set_name").wait(); return; } my->name = n; @@ -152,17 +145,13 @@ namespace fc { if( !is_current() ) { auto t = my->boost_thread; - async( [=](){quit();}, "thread::quit" );//.wait(); + async( [this](){quit();}, "thread::quit" ); if( t ) - { - //wlog("destroying boost thread ${tid}",("tid",(uintptr_t)my->boost_thread->native_handle())); t->join(); - } return; } my->done = true; - // wlog( "${s}", ("s",name()) ); // We are quiting from our own thread... // break all promises, thread quit! @@ -173,20 +162,14 @@ namespace fc { { fc::context* n = cur->next; // this will move the context into the ready list. - //cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) ); - //cur->set_exception_on_blocking_promises( thread_quit() ); cur->set_exception_on_blocking_promises( std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) ); cur = n; } if( my->blocked ) - { - //wlog( "still blocking... whats up with that?"); debug( "on quit" ); - } } BOOST_ASSERT( my->blocked == 0 ); - //my->blocked = 0; for (task_base* unstarted_task : my->task_pqueue) unstarted_task->set_exception(std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting"))); @@ -314,7 +297,6 @@ namespace fc { if( p[i]->ready() ) return i; - //BOOST_THROW_EXCEPTION( wait_any_error() ); return -1; } @@ -330,8 +312,6 @@ namespace fc { void thread::async_task( task_base* t, const priority& p, const time_point& tp ) { assert(my); t->_when = tp; - // slog( "when %lld", t->_when.time_since_epoch().count() ); - // slog( "delay %lld", (tp - fc::time_point::now()).count() ); task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed); do { t->_next = stale_head; }while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) ); @@ -381,7 +361,6 @@ namespace fc { if( !my->current ) my->current = new fc::context(&fc::thread::current()); - //slog( " %1% blocking on %2%", my->current, p.get() ); my->current->add_blocking_promise(p.get(), true); // if not max timeout, added to sleep pqueue @@ -394,15 +373,10 @@ namespace fc { sleep_priority_less() ); } - // elog( "blocking %1%", my->current ); my->add_to_blocked( my->current ); - // my->debug("swtiching fibers..." ); - my->start_next_fiber(); - // slog( "resuming %1%", my->current ); - //slog( " %1% unblocking blocking on %2%", my->current, p.get() ); my->current->remove_blocking_promise(p.get()); my->check_fiber_exceptions(); @@ -410,7 +384,6 @@ namespace fc { void thread::notify( const promise_base::ptr& p ) { - //slog( "this %p my %p", this, my ); BOOST_ASSERT(p->ready()); if( !is_current() ) { @@ -473,7 +446,7 @@ namespace fc { void thread::notify_task_has_been_canceled() { - async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); + async( [this](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); } void thread::unblock(fc::context* c) @@ -481,6 +454,26 @@ namespace fc { my->unblock(c); } + namespace detail { + idle_guard::idle_guard( thread_d* t ) : notifier(t->notifier) + { + if( notifier ) + { + task_base* work = notifier->idle(); + if( work ) + { + task_base* stale_head = t->task_in_queue.load(boost::memory_order_relaxed); + do { + work->_next = stale_head; + } while( !t->task_in_queue.compare_exchange_weak( stale_head, work, boost::memory_order_release ) ); + } + } + } + idle_guard::~idle_guard() + { + if( notifier ) notifier->busy(); + } + } #ifdef _MSC_VER /* support for providing a structured exception handler for async tasks */ diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 6ad9421..d07e3f9 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -15,12 +15,23 @@ namespace fc { return a->resume_time > b->resume_time; } }; + + namespace detail { + class idle_guard { + public: + idle_guard( thread_d* t ); + ~idle_guard(); + private: + thread_idle_notifier* notifier; + }; + } + class thread_d { public: using context_pair = std::pair; - thread_d(fc::thread& s) + thread_d( fc::thread& s, thread_idle_notifier* n = 0 ) :self(s), boost_thread(0), task_in_queue(0), next_posted_num(1), @@ -28,7 +39,8 @@ namespace fc { current(0), pt_head(0), blocked(0), - next_unused_task_storage_slot(0) + next_unused_task_storage_slot(0), + notifier(n) #ifndef NDEBUG ,non_preemptable_scope_count(0) #endif @@ -98,6 +110,8 @@ namespace fc { std::vector non_task_specific_data; unsigned next_unused_task_storage_slot; + thread_idle_notifier *notifier; + #ifndef NDEBUG unsigned non_preemptable_scope_count; #endif @@ -585,6 +599,11 @@ namespace fc { if( done ) return; + + detail::idle_guard guard( this ); + if( task_in_queue.load(boost::memory_order_relaxed) ) + continue; + if( timeout_time == time_point::maximum() ) task_ready.wait( lock ); else if( timeout_time != time_point::min() ) @@ -666,7 +685,7 @@ namespace fc { { if( fc::thread::current().my != this ) { - self.async( [=](){ unblock(c); }, "thread_d::unblock" ); + self.async( [this,c](){ unblock(c); }, "thread_d::unblock" ); return; } From afcb1e3543c04c99d8528781e401af4aec974866 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Wed, 3 Oct 2018 10:50:33 +0200 Subject: [PATCH 11/20] Moved do_parallel to fc::thread-based worker pool implementation --- CMakeLists.txt | 1 + include/fc/asio.hpp | 3 +- include/fc/thread/parallel.hpp | 25 +++--- src/asio.cpp | 12 +-- src/thread/parallel.cpp | 154 +++++++++++++++++++++++++++++++++ tests/io/tcp_test.cpp | 4 - 6 files changed, 177 insertions(+), 22 deletions(-) create mode 100644 src/thread/parallel.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 7b76491..23bf263 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -217,6 +217,7 @@ set( fc_sources src/thread/spin_lock.cpp src/thread/spin_yield_lock.cpp src/thread/mutex.cpp + src/thread/parallel.cpp src/thread/non_preemptable_scope_check.cpp src/asio.cpp src/string.cpp diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index a2a0fb7..3d11a38 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -77,7 +77,8 @@ namespace asio { public: default_io_service_scope(); ~default_io_service_scope(); - static void set_num_threads(uint16_t num_threads); + static void set_num_threads(uint16_t num_threads); + static uint16_t get_num_threads(); boost::asio::io_service* io; private: std::vector asio_threads; diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp index 3425b85..c56047b 100644 --- a/include/fc/thread/parallel.hpp +++ b/include/fc/thread/parallel.hpp @@ -25,23 +25,24 @@ #pragma once #include +#include #include -/* NOTE: the methods in this header are NOT to be mixed up with fc's - * multithreading. Parallel functions MUST NOT call fc::thread::yield NOR - * use fc's mutexes etc.! - */ namespace fc { namespace detail { - template - class parallel_completion_handler { - public: - parallel_completion_handler( Task* task ) : _task(task) {} - void operator()() { _task->run(); } - private: - Task* _task; + class pool_impl; + + class worker_pool { + public: + worker_pool(); + ~worker_pool(); + void post( task_base* task ); + private: + pool_impl* my; }; + + worker_pool& get_worker_pool(); } /** @@ -57,7 +58,7 @@ namespace fc { fc::task* tsk = new fc::task( fc::forward(f), desc ); fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - fc::asio::default_io_service().post( detail::parallel_completion_handler>( tsk ) ); + detail::get_worker_pool().post( tsk ); return r; } } diff --git a/src/asio.cpp b/src/asio.cpp index cec7de6..0716bad 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -101,10 +101,12 @@ namespace fc { * @param num_threads the number of threads */ void default_io_service_scope::set_num_threads(uint16_t num_threads) { - FC_ASSERT(fc::asio::default_io_service_scope::num_io_threads == 0); - fc::asio::default_io_service_scope::num_io_threads = num_threads; + FC_ASSERT(num_io_threads == 0); + num_io_threads = num_threads; } + uint16_t default_io_service_scope::get_num_threads() { return num_io_threads; } + /*** * Default constructor */ @@ -113,14 +115,14 @@ namespace fc { io = new boost::asio::io_service(); the_work = new boost::asio::io_service::work(*io); - if (this->num_io_threads == 0) + if( num_io_threads == 0 ) { // the default was not set by the configuration. Determine a good // number of threads. Minimum of 8, maximum of hardware_concurrency - this->num_io_threads = std::max( boost::thread::hardware_concurrency(), 8u ); + num_io_threads = std::max( boost::thread::hardware_concurrency(), 8u ); } - for( uint16_t i = 0; i < this->num_io_threads; ++i ) + for( uint16_t i = 0; i < num_io_threads; ++i ) { asio_threads.push_back( new boost::thread( [i,this]() { diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp new file mode 100644 index 0000000..046b283 --- /dev/null +++ b/src/thread/parallel.cpp @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include +#include +#include + +#include +#include + +namespace fc { + namespace detail { + class pool_impl; + + class idle_notifier_impl : public thread_idle_notifier + { + public: + idle_notifier_impl() + { + is_idle.store(false); + } + + idle_notifier_impl( const idle_notifier_impl& copy ) + { + id = copy.id; + my_pool = copy.my_pool; + is_idle.store( copy.is_idle.load() ); + } + + virtual task_base* idle(); + virtual void busy() + { + is_idle.store(false); + } + + uint32_t id; + pool_impl* my_pool; + boost::atomic is_idle; + }; + + class pool_impl + { + public: + pool_impl( const uint16_t num_threads ) + { + notifiers.resize( num_threads ); + threads.reserve( num_threads ); + for( uint32_t i = 0; i < num_threads; i++ ) + { + notifiers[i].id = i; + notifiers[i].my_pool = this; + threads.push_back( new thread( "pool worker " + fc::to_string(i), ¬ifiers[i] ) ); + } + } + ~pool_impl() + { + for( thread* t : threads) + delete t; // also calls quit() + waiting_tasks.consume_all( [] ( task_base* t ) { + t->cancel( "thread pool quitting" ); + }); + } + + void post( task_base* task ) + { + idle_notifier_impl* ini; + while( idle_threads.pop( ini ) ) + if( ini->is_idle.exchange( false ) ) + { // minor race condition here, a thread might receive a task while it's busy + threads[ini->id]->async_task( task, priority() ); + return; + } + boost::unique_lock lock(pool_lock); + while( idle_threads.pop( ini ) ) + if( ini->is_idle.exchange( false ) ) + { // minor race condition here, a thread might receive a task while it's busy + threads[ini->id]->async_task( task, priority() ); + return; + } + waiting_tasks.push( task ); + } + + task_base* enqueue_idle_thread( idle_notifier_impl* ini ) + { + task_base* task; + if( waiting_tasks.pop( task ) ) + return task; + fc::unique_lock lock(pool_lock); + if( waiting_tasks.pop( task ) ) + return task; + idle_threads.push( ini ); + return 0; + } + private: + std::vector notifiers; + std::vector threads; + boost::lockfree::queue idle_threads; + boost::lockfree::queue waiting_tasks; + fc::spin_yield_lock pool_lock; + }; + + task_base* idle_notifier_impl::idle() + { + is_idle.store( true ); + task_base* result = my_pool->enqueue_idle_thread( this ); + if( result ) is_idle.store( false ); + return result; + } + + worker_pool::worker_pool() + { + fc::asio::default_io_service(); + my = new pool_impl( fc::asio::default_io_service_scope::get_num_threads() ); + } + + worker_pool::~worker_pool() + { + delete my; + } + + void worker_pool::post( task_base* task ) + { + my->post( task ); + } + + worker_pool& get_worker_pool() + { + static worker_pool the_pool; + return the_pool; + } + } +} diff --git a/tests/io/tcp_test.cpp b/tests/io/tcp_test.cpp index 98ef876..0a43975 100644 --- a/tests/io/tcp_test.cpp +++ b/tests/io/tcp_test.cpp @@ -18,10 +18,6 @@ BOOST_AUTO_TEST_CASE(tcpconstructor_test) class my_io_class : public fc::asio::default_io_service_scope { public: - uint16_t get_num_threads() - { - return fc::asio::default_io_service_scope::num_io_threads; - } static void reset_num_threads() { fc::asio::default_io_service_scope::num_io_threads = 0; } }; From b0f4e55aee0702df571702629f468a428369745e Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Thu, 4 Oct 2018 14:29:59 +0200 Subject: [PATCH 12/20] --list-content doesnt work properly until boost-1.59 --- tests/run-parallel-tests.sh | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/tests/run-parallel-tests.sh b/tests/run-parallel-tests.sh index ac7eccc..adc3eb2 100755 --- a/tests/run-parallel-tests.sh +++ b/tests/run-parallel-tests.sh @@ -5,14 +5,26 @@ if [ "$#" != 1 ]; then exit 1 fi -"$1" --list_content 2>&1 \ - | grep '\*$' \ - | sed 's=\*$==;s=^ =/=' \ - | while read t; do +CACHE="$( find . -name CMakeCache.txt )" +if [ "$CACHE" != "" ]; then + BOOST_INC="$( grep Boost_INCLUDE_DIR:PATH "$CACHE" | cut -d= -f 2 )" + if [ "$BOOST_INC" != "" ]; then + BOOST_VERSION="$( grep '^#define *BOOST_VERSION ' "$BOOST_INC/boost/version.hpp" | sed 's=^.* ==' )" + fi +fi + +if [ "$BOOST_VERSION" = "" -o "$BOOST_VERSION" -lt 105900 ]; then + echo "Boost version '$BOOST_VERSION' - executing tests serially" + "$1" +else + "$1" --list_content 2>&1 \ + | grep '\*$' \ + | sed 's=\*$==;s=^ =/=' \ + | while read t; do case "$t" in /*) echo "$pre$t"; ;; *) pre="$t"; ;; esac - done \ - | parallel echo Running {}\; "$1" -t {} - + done \ + | parallel echo Running {}\; "$1" -t {} +fi From e336b0bb5c412c8bd53b665a7a37036e2b290dca Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Fri, 5 Oct 2018 15:02:31 +0200 Subject: [PATCH 13/20] Added serial_valve --- include/fc/thread/parallel.hpp | 40 +++++++++ src/thread/parallel.cpp | 46 +++++++++- tests/thread/parallel_tests.cpp | 145 ++++++++++++++++++++++++++++++++ 3 files changed, 230 insertions(+), 1 deletion(-) diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp index c56047b..3118703 100644 --- a/include/fc/thread/parallel.hpp +++ b/include/fc/thread/parallel.hpp @@ -45,6 +45,46 @@ namespace fc { worker_pool& get_worker_pool(); } + class serial_valve { + private: + class ticket_guard { + public: + ticket_guard( boost::atomic*>& latch ); + ~ticket_guard(); + void wait_for_my_turn(); + private: + promise* my_promise; + future* ticket; + }; + + friend class ticket_guard; + boost::atomic*> latch; + + public: + serial_valve(); + ~serial_valve(); + + /** Executes f1() then f2(). + * For any two calls do_serial(f1,f2) and do_serial(f1',f2') where + * do_serial(f1,f2) is invoked before do_serial(f1',f2'), it is + * guaranteed that f2' will be executed after f2 has completed. Failure + * of either function counts as completion of both. + * If f1 throws then f2 will not be invoked. + * + * @param f1 a functor to invoke + * @param f2 a functor to invoke + * @return the return value of f2() + */ + template + auto do_serial( const Functor1& f1, const Functor2& f2 ) -> decltype(f2()) + { + ticket_guard guard( latch ); + f1(); + guard.wait_for_my_turn(); + return f2(); + } + }; + /** * Calls function f in a separate thread and returns a future * that can be used to wait on the result. diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp index 046b283..5e16961 100644 --- a/src/thread/parallel.cpp +++ b/src/thread/parallel.cpp @@ -151,4 +151,48 @@ namespace fc { return the_pool; } } -} + + serial_valve::ticket_guard::ticket_guard( boost::atomic*>& latch ) + { + my_promise = new promise(); + future* my_future = new future( promise::ptr( my_promise, true ) ); + try + { + do + { + ticket = latch.load(); + FC_ASSERT( ticket, "Valve is shutting down!" ); + } + while( !latch.compare_exchange_weak( ticket, my_future ) ); + } + catch (...) + { + delete my_future; // this takes care of my_promise as well + throw; + } + } + + serial_valve::ticket_guard::~ticket_guard() + { + my_promise->set_value(); + ticket->wait(); + delete ticket; + } + + void serial_valve::ticket_guard::wait_for_my_turn() + { + ticket->wait(); + } + + serial_valve::serial_valve() + { + latch.store( new future( promise::ptr( new promise( true ), true ) ) ); + } + + serial_valve::~serial_valve() + { + fc::future* last = latch.exchange( 0 ); + last->wait(); + delete last; + } +} // namespace fc diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp index e1ad2b7..144175b 100644 --- a/tests/thread/parallel_tests.cpp +++ b/tests/thread/parallel_tests.cpp @@ -177,4 +177,149 @@ BOOST_AUTO_TEST_CASE( sign_verify_parallel ) } } +BOOST_AUTO_TEST_CASE( serial_valve ) +{ + boost::atomic counter(0); + fc::serial_valve valve; + + { // Simple test, f2 finishes before f1 + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + auto p1 = fc::async([&counter,&valve,syncer,waiter] () { + valve.do_serial( [syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p2.f1 has started executing and p2.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p2.wait(); // and wait for p2.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK_EQUAL( 2, counter.load() ); + } + + { // Triple test, f3 finishes first, then f1, finally f2 + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + counter.store(0); + auto p1 = fc::async([&counter,&valve,syncer,waiter] () { + valve.do_serial( [&syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [&syncer](){ syncer->set_value(); + fc::usleep( fc::milliseconds(100) ); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p2.f1 has started executing and is sleeping + + syncer = new fc::promise(); + auto p3 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p3.f1 has started executing and p3.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + BOOST_CHECK( !p3.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p3.wait(); // and wait for p3.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK( p3.ready() ); + BOOST_CHECK_EQUAL( 3, counter.load() ); + } + + { // Triple test again but with invocations from different threads + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + counter.store(0); + auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () { + valve.do_serial( [&syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::do_parallel([&counter,&valve,syncer] () { + valve.do_serial( [&syncer](){ syncer->set_value(); + fc::usleep( fc::milliseconds(100) ); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p2.f1 has started executing and is sleeping + + syncer = new fc::promise(); + auto p3 = fc::do_parallel([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p3.f1 has started executing and p3.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + BOOST_CHECK( !p3.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p3.wait(); // and wait for p3.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK( p3.ready() ); + BOOST_CHECK_EQUAL( 3, counter.load() ); + } +} + BOOST_AUTO_TEST_SUITE_END() From 08a66f52d3b4a69ebf8262f78c4db84a7e33963d Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Sat, 6 Oct 2018 11:05:45 +0200 Subject: [PATCH 14/20] Add option --pool-threads for easier testing --- tests/thread/parallel_tests.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp index 144175b..ddd04ed 100644 --- a/tests/thread/parallel_tests.cpp +++ b/tests/thread/parallel_tests.cpp @@ -33,6 +33,21 @@ #include #include +struct thread_config { + thread_config() { + for( int i = 0; i < boost::unit_test::framework::master_test_suite().argc - 1; ++i ) + if( !strcmp( boost::unit_test::framework::master_test_suite().argv[i], "--pool-threads" ) ) + { + uint16_t threads = atoi(boost::unit_test::framework::master_test_suite().argv[++i]); + std::cout << "Using " << threads << " pool threads\n"; + fc::asio::default_io_service_scope::set_num_threads(threads); + } + } +}; + +BOOST_GLOBAL_FIXTURE( thread_config ); + + BOOST_AUTO_TEST_SUITE(parallel_tests) BOOST_AUTO_TEST_CASE( do_nothing_parallel ) From fc61ef3d040864bfbc18612422eb84718ee610d4 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Sat, 6 Oct 2018 11:06:35 +0200 Subject: [PATCH 15/20] Improved error handling on thread exit --- src/thread/thread.cpp | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 73fa303..e8cd5de 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -80,11 +80,27 @@ namespace fc { p->set_value(); exec(); } catch ( fc::exception& e ) { - wlog( "unhandled exception" ); - p->set_exception( e.dynamic_copy_exception() ); + if( !p->ready() ) + { + wlog( "unhandled exception" ); + p->set_exception( e.dynamic_copy_exception() ); + } + else + { // possibly shutdown? + std::cerr << "unhandled exception in thread '" << name << "'\n"; + std::cerr << e.to_detail_string( log_level::warn ); + } } catch ( ... ) { - wlog( "unhandled exception" ); - p->set_exception( std::make_shared( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); + if( !p->ready() ) + { + wlog( "unhandled exception" ); + p->set_exception( std::make_shared( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); + } + else + { // possibly shutdown? + std::cerr << "unhandled exception in thread '" << name << "'\n"; + std::cerr << boost::current_exception_diagnostic_information() << "\n"; + } } } ); p->wait(); From 8eff0016552db641d543e37b6c4fff1fe8cdb5c9 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Sat, 6 Oct 2018 11:07:11 +0200 Subject: [PATCH 16/20] Initialize queues properly --- src/thread/parallel.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp index 5e16961..83ef647 100644 --- a/src/thread/parallel.cpp +++ b/src/thread/parallel.cpp @@ -64,6 +64,7 @@ namespace fc { { public: pool_impl( const uint16_t num_threads ) + : idle_threads( 2 * num_threads ), waiting_tasks( 200 ) { notifiers.resize( num_threads ); threads.reserve( num_threads ); @@ -99,7 +100,8 @@ namespace fc { threads[ini->id]->async_task( task, priority() ); return; } - waiting_tasks.push( task ); + while( !waiting_tasks.push( task ) ) + elog( "Worker pool internal error" ); } task_base* enqueue_idle_thread( idle_notifier_impl* ini ) @@ -110,7 +112,8 @@ namespace fc { fc::unique_lock lock(pool_lock); if( waiting_tasks.pop( task ) ) return task; - idle_threads.push( ini ); + while( !idle_threads.push( ini ) ) + elog( "Worker pool internal error" ); return 0; } private: From d0b280aca76257311ef58929705a317e80915e3e Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Tue, 9 Oct 2018 23:28:02 +0200 Subject: [PATCH 17/20] Fixed possible deadlock --- include/fc/thread/thread.hpp | 4 ++-- src/thread/parallel.cpp | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index bbd8317..931a9fe 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -12,7 +12,7 @@ namespace fc { namespace detail { - class pool_impl; + class worker_pool; void* get_thread_specific_data(unsigned slot); void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); unsigned get_next_unused_task_storage_slot(); @@ -152,7 +152,7 @@ namespace fc { friend class task_base; friend class thread_d; friend class mutex; - friend class detail::pool_impl; + friend class detail::worker_pool; friend void* detail::get_thread_specific_data(unsigned slot); friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); friend unsigned detail::get_next_unused_task_storage_slot(); diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp index 83ef647..dfa85c3 100644 --- a/src/thread/parallel.cpp +++ b/src/thread/parallel.cpp @@ -84,24 +84,23 @@ namespace fc { }); } - void post( task_base* task ) + thread* post( task_base* task ) { idle_notifier_impl* ini; while( idle_threads.pop( ini ) ) if( ini->is_idle.exchange( false ) ) { // minor race condition here, a thread might receive a task while it's busy - threads[ini->id]->async_task( task, priority() ); - return; + return threads[ini->id]; } boost::unique_lock lock(pool_lock); while( idle_threads.pop( ini ) ) if( ini->is_idle.exchange( false ) ) { // minor race condition here, a thread might receive a task while it's busy - threads[ini->id]->async_task( task, priority() ); - return; + return threads[ini->id]; } while( !waiting_tasks.push( task ) ) elog( "Worker pool internal error" ); + return 0; } task_base* enqueue_idle_thread( idle_notifier_impl* ini ) @@ -145,7 +144,9 @@ namespace fc { void worker_pool::post( task_base* task ) { - my->post( task ); + thread* worker = my->post( task ); + if( worker ) + worker->async_task( task, priority() ); } worker_pool& get_worker_pool() From 018642659dced891e3c5769ba2abf1ccce7a9eaa Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Tue, 9 Oct 2018 23:42:03 +0200 Subject: [PATCH 18/20] Fixed some new code smells --- include/fc/thread/parallel.hpp | 2 +- include/fc/thread/thread.hpp | 2 ++ src/asio.cpp | 2 +- src/thread/parallel.cpp | 6 +++--- src/thread/thread_d.hpp | 2 +- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp index 3118703..e9311ab 100644 --- a/include/fc/thread/parallel.hpp +++ b/include/fc/thread/parallel.hpp @@ -49,7 +49,7 @@ namespace fc { private: class ticket_guard { public: - ticket_guard( boost::atomic*>& latch ); + explicit ticket_guard( boost::atomic*>& latch ); ~ticket_guard(); void wait_for_my_turn(); private: diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index 931a9fe..282fc30 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -25,6 +25,8 @@ namespace fc { */ class thread_idle_notifier { public: + virtual ~thread_idle_notifier() {} + /** This method is called when the thread is idle. If it returns a * task_base it will be queued and executed immediately. * @return a task to execute, or nullptr diff --git a/src/asio.cpp b/src/asio.cpp index 0716bad..35b2990 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -119,7 +119,7 @@ namespace fc { { // the default was not set by the configuration. Determine a good // number of threads. Minimum of 8, maximum of hardware_concurrency - num_io_threads = std::max( boost::thread::hardware_concurrency(), 8u ); + num_io_threads = std::max( boost::thread::hardware_concurrency(), 8U ); } for( uint16_t i = 0; i < num_io_threads; ++i ) diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp index dfa85c3..10b709e 100644 --- a/src/thread/parallel.cpp +++ b/src/thread/parallel.cpp @@ -32,8 +32,6 @@ namespace fc { namespace detail { - class pool_impl; - class idle_notifier_impl : public thread_idle_notifier { public: @@ -49,6 +47,8 @@ namespace fc { is_idle.store( copy.is_idle.load() ); } + virtual ~idle_notifier_impl() {} + virtual task_base* idle(); virtual void busy() { @@ -63,7 +63,7 @@ namespace fc { class pool_impl { public: - pool_impl( const uint16_t num_threads ) + explicit pool_impl( const uint16_t num_threads ) : idle_threads( 2 * num_threads ), waiting_tasks( 200 ) { notifiers.resize( num_threads ); diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index d07e3f9..c11403a 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -19,7 +19,7 @@ namespace fc { namespace detail { class idle_guard { public: - idle_guard( thread_d* t ); + explicit idle_guard( thread_d* t ); ~idle_guard(); private: thread_idle_notifier* notifier; From 5b99b41a44bc4548de88a5780660c8a7c7f5b476 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Sat, 10 Nov 2018 21:25:56 +0100 Subject: [PATCH 19/20] Added missing include --- src/thread/thread.cpp | 2 ++ tests/thread/parallel_tests.cpp | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index e8cd5de..f1c1e99 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -4,6 +4,8 @@ #include #include "thread_d.hpp" +#include + #if defined(_MSC_VER) && !defined(NDEBUG) # include const DWORD MS_VC_EXCEPTION=0x406D1388; diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp index ddd04ed..7da4c7b 100644 --- a/tests/thread/parallel_tests.cpp +++ b/tests/thread/parallel_tests.cpp @@ -33,6 +33,8 @@ #include #include +#include + struct thread_config { thread_config() { for( int i = 0; i < boost::unit_test::framework::master_test_suite().argc - 1; ++i ) From ed775a594fb767d63851ede95480f951b9913673 Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Wed, 14 Nov 2018 17:54:09 +0100 Subject: [PATCH 20/20] Added missing include --- include/fc/thread/parallel.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp index e9311ab..59f9cf6 100644 --- a/include/fc/thread/parallel.hpp +++ b/include/fc/thread/parallel.hpp @@ -28,6 +28,8 @@ #include #include +#include + namespace fc { namespace detail {