diff --git a/CMakeLists.txt b/CMakeLists.txt index 261f952..b7873ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -217,6 +217,7 @@ set( fc_sources src/thread/spin_lock.cpp src/thread/spin_yield_lock.cpp src/thread/mutex.cpp + src/thread/parallel.cpp src/thread/non_preemptable_scope_check.cpp src/asio.cpp src/string.cpp diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index 3ad05b2..3d11a38 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -19,7 +19,6 @@ namespace asio { * @brief internal implementation types/methods for fc::asio */ namespace detail { - using namespace fc; class read_write_handler { @@ -59,14 +58,14 @@ namespace asio { bool operator()( C& c, bool s ) { c.non_blocking(s); return true; } }; - #if WIN32 // windows stream handles do not support non blocking! +#if WIN32 // windows stream handles do not support non blocking! template<> struct non_blocking { typedef boost::asio::windows::stream_handle C; bool operator()( C& ) { return false; } bool operator()( C&, bool ) { return false; } }; - #endif +#endif } // end of namespace detail /*** @@ -78,7 +77,8 @@ namespace asio { public: default_io_service_scope(); ~default_io_service_scope(); - static void set_num_threads(uint16_t num_threads); + static void set_num_threads(uint16_t num_threads); + static uint16_t get_num_threads(); boost::asio::io_service* io; private: std::vector asio_threads; diff --git a/include/fc/container/deque.hpp b/include/fc/container/deque.hpp deleted file mode 100644 index 6a05dc8..0000000 --- a/include/fc/container/deque.hpp +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include -#include - -namespace fc { - namespace raw { - - - } // namespace raw - -} // namespace fc diff --git a/include/fc/crypto/sha1.hpp b/include/fc/crypto/sha1.hpp index a269797..01c6ca8 100644 --- a/include/fc/crypto/sha1.hpp +++ b/include/fc/crypto/sha1.hpp @@ -2,6 +2,8 @@ #include #include +#include + namespace fc{ class sha1 @@ -82,3 +84,6 @@ namespace std } }; } + +#include +FC_REFLECT_TYPENAME( fc::sha1 ) diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index cc2c317..0d0e2e7 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -21,7 +21,6 @@ #endif namespace fc { - class abstract_thread; struct void_t{}; class priority; class thread; @@ -146,7 +145,9 @@ namespace fc { public: typedef fc::shared_ptr< promise > ptr; promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} - //promise( const void_t& ){ set_value(); } + promise( bool fulfilled, const char* desc FC_TASK_NAME_DEFAULT_ARG ){ + if( fulfilled ) set_value(); + } void wait(const microseconds& timeout = microseconds::maximum() ){ this->_wait( timeout ); diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp new file mode 100644 index 0000000..59f9cf6 --- /dev/null +++ b/include/fc/thread/parallel.hpp @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include +#include +#include + +#include + +namespace fc { + + namespace detail { + class pool_impl; + + class worker_pool { + public: + worker_pool(); + ~worker_pool(); + void post( task_base* task ); + private: + pool_impl* my; + }; + + worker_pool& get_worker_pool(); + } + + class serial_valve { + private: + class ticket_guard { + public: + explicit ticket_guard( boost::atomic*>& latch ); + ~ticket_guard(); + void wait_for_my_turn(); + private: + promise* my_promise; + future* ticket; + }; + + friend class ticket_guard; + boost::atomic*> latch; + + public: + serial_valve(); + ~serial_valve(); + + /** Executes f1() then f2(). + * For any two calls do_serial(f1,f2) and do_serial(f1',f2') where + * do_serial(f1,f2) is invoked before do_serial(f1',f2'), it is + * guaranteed that f2' will be executed after f2 has completed. Failure + * of either function counts as completion of both. + * If f1 throws then f2 will not be invoked. + * + * @param f1 a functor to invoke + * @param f2 a functor to invoke + * @return the return value of f2() + */ + template + auto do_serial( const Functor1& f1, const Functor2& f2 ) -> decltype(f2()) + { + ticket_guard guard( latch ); + f1(); + guard.wait_for_my_turn(); + return f2(); + } + }; + + /** + * Calls function f in a separate thread and returns a future + * that can be used to wait on the result. + * + * @param f the operation to perform + */ + template + auto do_parallel( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG ) -> fc::future { + typedef decltype(f()) Result; + typedef typename fc::deduce::type FunctorType; + fc::task* tsk = + new fc::task( fc::forward(f), desc ); + fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); + detail::get_worker_pool().post( tsk ); + return r; + } +} diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index dd52b77..6c79d71 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -25,6 +25,7 @@ namespace fc { }; void* get_task_specific_data(unsigned slot); void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); + class idle_guard; } class task_base : virtual public promise_base { @@ -53,6 +54,7 @@ namespace fc { // thread/thread_private friend class thread; friend class thread_d; + friend class detail::idle_guard; fwd _spinlock; // avoid rtti info for every possible functor... diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index aea55da..282fc30 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -12,6 +12,7 @@ namespace fc { namespace detail { + class worker_pool; void* get_thread_specific_data(unsigned slot); void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); unsigned get_next_unused_task_storage_slot(); @@ -19,11 +20,29 @@ namespace fc { void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); } + /** Instances of this class can be used to get notifications when a thread is + * (or is no longer) idle. + */ + class thread_idle_notifier { + public: + virtual ~thread_idle_notifier() {} + + /** This method is called when the thread is idle. If it returns a + * task_base it will be queued and executed immediately. + * @return a task to execute, or nullptr + */ + virtual task_base* idle() = 0; + /** This method is called when the thread is no longer idle, e. g. after + * it has woken up due to a timer or signal. + */ + virtual void busy() = 0; + }; + class thread { public: - thread( const std::string& name = "" ); - thread( thread&& m ); - thread& operator=(thread&& t ); + thread( const std::string& name = "", thread_idle_notifier* notifier = 0 ); + thread( thread&& m ) = delete; + thread& operator=(thread&& t ) = delete; /** * Returns the current thread. @@ -130,11 +149,12 @@ namespace fc { return wait_any_until(fc::move(proms), fc::time_point::now()+timeout_us ); } private: - thread( class thread_d* ); + thread( class thread_d* ); // parameter is ignored, will create a new thread_d friend class promise_base; friend class task_base; friend class thread_d; friend class mutex; + friend class detail::worker_pool; friend void* detail::get_thread_specific_data(unsigned slot); friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*)); friend unsigned detail::get_next_unused_task_storage_slot(); diff --git a/src/asio.cpp b/src/asio.cpp index 909f4a5..35b2990 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -58,7 +58,6 @@ namespace fc { } else { - //elog( "${message} ", ("message", boost::system::system_error(ec).what())); p->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); } @@ -83,8 +82,6 @@ namespace fc { } p->set_value( eps ); } else { - //elog( "%s", boost::system::system_error(ec).what() ); - //p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); p->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "process exited with: ${message} ", @@ -104,10 +101,12 @@ namespace fc { * @param num_threads the number of threads */ void default_io_service_scope::set_num_threads(uint16_t num_threads) { - FC_ASSERT(fc::asio::default_io_service_scope::num_io_threads == 0); - fc::asio::default_io_service_scope::num_io_threads = num_threads; + FC_ASSERT(num_io_threads == 0); + num_io_threads = num_threads; } + uint16_t default_io_service_scope::get_num_threads() { return num_io_threads; } + /*** * Default constructor */ @@ -116,18 +115,18 @@ namespace fc { io = new boost::asio::io_service(); the_work = new boost::asio::io_service::work(*io); - if (this->num_io_threads == 0) + if( num_io_threads == 0 ) { // the default was not set by the configuration. Determine a good // number of threads. Minimum of 8, maximum of hardware_concurrency - this->num_io_threads = std::max( boost::thread::hardware_concurrency(), 8u ); + num_io_threads = std::max( boost::thread::hardware_concurrency(), 8U ); } - for( uint16_t i = 0; i < this->num_io_threads; ++i ) + for( uint16_t i = 0; i < num_io_threads; ++i ) { - asio_threads.push_back( new boost::thread( [=]() + asio_threads.push_back( new boost::thread( [i,this]() { - fc::thread::current().set_name("asio"); + fc::thread::current().set_name( "fc::asio worker #" + fc::to_string(i) ); BOOST_SCOPE_EXIT(void) { @@ -194,7 +193,7 @@ namespace fc { promise >::ptr p( new promise >("tcp::resolve completion") ); res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), boost::bind( detail::resolve_handler, p, _1, _2 ) ); - return p->wait();; + return p->wait(); } FC_RETHROW_EXCEPTIONS(warn, "") } diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp new file mode 100644 index 0000000..10b709e --- /dev/null +++ b/src/thread/parallel.cpp @@ -0,0 +1,202 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include +#include +#include + +#include +#include + +namespace fc { + namespace detail { + class idle_notifier_impl : public thread_idle_notifier + { + public: + idle_notifier_impl() + { + is_idle.store(false); + } + + idle_notifier_impl( const idle_notifier_impl& copy ) + { + id = copy.id; + my_pool = copy.my_pool; + is_idle.store( copy.is_idle.load() ); + } + + virtual ~idle_notifier_impl() {} + + virtual task_base* idle(); + virtual void busy() + { + is_idle.store(false); + } + + uint32_t id; + pool_impl* my_pool; + boost::atomic is_idle; + }; + + class pool_impl + { + public: + explicit pool_impl( const uint16_t num_threads ) + : idle_threads( 2 * num_threads ), waiting_tasks( 200 ) + { + notifiers.resize( num_threads ); + threads.reserve( num_threads ); + for( uint32_t i = 0; i < num_threads; i++ ) + { + notifiers[i].id = i; + notifiers[i].my_pool = this; + threads.push_back( new thread( "pool worker " + fc::to_string(i), ¬ifiers[i] ) ); + } + } + ~pool_impl() + { + for( thread* t : threads) + delete t; // also calls quit() + waiting_tasks.consume_all( [] ( task_base* t ) { + t->cancel( "thread pool quitting" ); + }); + } + + thread* post( task_base* task ) + { + idle_notifier_impl* ini; + while( idle_threads.pop( ini ) ) + if( ini->is_idle.exchange( false ) ) + { // minor race condition here, a thread might receive a task while it's busy + return threads[ini->id]; + } + boost::unique_lock lock(pool_lock); + while( idle_threads.pop( ini ) ) + if( ini->is_idle.exchange( false ) ) + { // minor race condition here, a thread might receive a task while it's busy + return threads[ini->id]; + } + while( !waiting_tasks.push( task ) ) + elog( "Worker pool internal error" ); + return 0; + } + + task_base* enqueue_idle_thread( idle_notifier_impl* ini ) + { + task_base* task; + if( waiting_tasks.pop( task ) ) + return task; + fc::unique_lock lock(pool_lock); + if( waiting_tasks.pop( task ) ) + return task; + while( !idle_threads.push( ini ) ) + elog( "Worker pool internal error" ); + return 0; + } + private: + std::vector notifiers; + std::vector threads; + boost::lockfree::queue idle_threads; + boost::lockfree::queue waiting_tasks; + fc::spin_yield_lock pool_lock; + }; + + task_base* idle_notifier_impl::idle() + { + is_idle.store( true ); + task_base* result = my_pool->enqueue_idle_thread( this ); + if( result ) is_idle.store( false ); + return result; + } + + worker_pool::worker_pool() + { + fc::asio::default_io_service(); + my = new pool_impl( fc::asio::default_io_service_scope::get_num_threads() ); + } + + worker_pool::~worker_pool() + { + delete my; + } + + void worker_pool::post( task_base* task ) + { + thread* worker = my->post( task ); + if( worker ) + worker->async_task( task, priority() ); + } + + worker_pool& get_worker_pool() + { + static worker_pool the_pool; + return the_pool; + } + } + + serial_valve::ticket_guard::ticket_guard( boost::atomic*>& latch ) + { + my_promise = new promise(); + future* my_future = new future( promise::ptr( my_promise, true ) ); + try + { + do + { + ticket = latch.load(); + FC_ASSERT( ticket, "Valve is shutting down!" ); + } + while( !latch.compare_exchange_weak( ticket, my_future ) ); + } + catch (...) + { + delete my_future; // this takes care of my_promise as well + throw; + } + } + + serial_valve::ticket_guard::~ticket_guard() + { + my_promise->set_value(); + ticket->wait(); + delete ticket; + } + + void serial_valve::ticket_guard::wait_for_my_turn() + { + ticket->wait(); + } + + serial_valve::serial_valve() + { + latch.store( new future( promise::ptr( new promise( true ), true ) ) ); + } + + serial_valve::~serial_valve() + { + fc::future* last = latch.exchange( 0 ); + last->wait(); + delete last; + } +} // namespace fc diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 7ca01f2..f1c1e99 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -4,6 +4,8 @@ #include #include "thread_d.hpp" +#include + #if defined(_MSC_VER) && !defined(NDEBUG) # include const DWORD MS_VC_EXCEPTION=0x406D1388; @@ -34,7 +36,7 @@ static void set_thread_name(const char* threadName) { } } -#elif defined(__linux__) && !defined(NDEBUG) +#elif defined(__linux__) # include static void set_thread_name(const char* threadName) { @@ -62,59 +64,58 @@ namespace fc { } thread*& current_thread() { - #ifdef _MSC_VER +#ifdef _MSC_VER static __declspec(thread) thread* t = NULL; - #else +#else static __thread thread* t = NULL; - #endif +#endif return t; } - thread::thread( const std::string& name ) { + thread::thread( const std::string& name, thread_idle_notifier* notifier ) { promise::ptr p(new promise("thread start")); - boost::thread* t = new boost::thread( [this,p,name]() { + boost::thread* t = new boost::thread( [this,p,name,notifier]() { try { set_thread_name(name.c_str()); // set thread's name for the debugger to display - this->my = new thread_d(*this); + this->my = new thread_d( *this, notifier ); current_thread() = this; p->set_value(); exec(); } catch ( fc::exception& e ) { - wlog( "unhandled exception" ); - p->set_exception( e.dynamic_copy_exception() ); + if( !p->ready() ) + { + wlog( "unhandled exception" ); + p->set_exception( e.dynamic_copy_exception() ); + } + else + { // possibly shutdown? + std::cerr << "unhandled exception in thread '" << name << "'\n"; + std::cerr << e.to_detail_string( log_level::warn ); + } } catch ( ... ) { - wlog( "unhandled exception" ); - p->set_exception( std::make_shared( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); - //assert( !"unhandled exception" ); - //elog( "Caught unhandled exception %s", boost::current_exception_diagnostic_information().c_str() ); + if( !p->ready() ) + { + wlog( "unhandled exception" ); + p->set_exception( std::make_shared( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); + } + else + { // possibly shutdown? + std::cerr << "unhandled exception in thread '" << name << "'\n"; + std::cerr << boost::current_exception_diagnostic_information() << "\n"; + } } } ); p->wait(); my->boost_thread = t; my->name = name; - //wlog("name:${n} tid:${tid}", ("n", name)("tid", (uintptr_t)my->boost_thread->native_handle()) ); } thread::thread( thread_d* ) { my = new thread_d(*this); } - thread::thread( thread&& m ) { - my = m.my; - m.my = 0; - } - - thread& thread::operator=(thread&& t ) { - fc_swap(t.my,my); - return *this; - } - thread::~thread() { - //wlog( "my ${n}", ("n",name()) ); if( my ) - { - // wlog( "calling quit() on ${n}",("n",my->name) ); quit(); - } delete my; } @@ -139,7 +140,7 @@ namespace fc { { if (!is_current()) { - async([=](){ set_name(n); }, "set_name").wait(); + async([this,n](){ set_name(n); }, "set_name").wait(); return; } my->name = n; @@ -162,17 +163,13 @@ namespace fc { if( !is_current() ) { auto t = my->boost_thread; - async( [=](){quit();}, "thread::quit" );//.wait(); + async( [this](){quit();}, "thread::quit" ); if( t ) - { - //wlog("destroying boost thread ${tid}",("tid",(uintptr_t)my->boost_thread->native_handle())); t->join(); - } return; } my->done = true; - // wlog( "${s}", ("s",name()) ); // We are quiting from our own thread... // break all promises, thread quit! @@ -183,20 +180,14 @@ namespace fc { { fc::context* n = cur->next; // this will move the context into the ready list. - //cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) ); - //cur->set_exception_on_blocking_promises( thread_quit() ); cur->set_exception_on_blocking_promises( std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) ); cur = n; } if( my->blocked ) - { - //wlog( "still blocking... whats up with that?"); debug( "on quit" ); - } } BOOST_ASSERT( my->blocked == 0 ); - //my->blocked = 0; for (task_base* unstarted_task : my->task_pqueue) unstarted_task->set_exception(std::make_shared(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting"))); @@ -324,7 +315,6 @@ namespace fc { if( p[i]->ready() ) return i; - //BOOST_THROW_EXCEPTION( wait_any_error() ); return -1; } @@ -340,8 +330,6 @@ namespace fc { void thread::async_task( task_base* t, const priority& p, const time_point& tp ) { assert(my); t->_when = tp; - // slog( "when %lld", t->_when.time_since_epoch().count() ); - // slog( "delay %lld", (tp - fc::time_point::now()).count() ); task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed); do { t->_next = stale_head; }while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) ); @@ -391,7 +379,6 @@ namespace fc { if( !my->current ) my->current = new fc::context(&fc::thread::current()); - //slog( " %1% blocking on %2%", my->current, p.get() ); my->current->add_blocking_promise(p.get(), true); // if not max timeout, added to sleep pqueue @@ -404,15 +391,10 @@ namespace fc { sleep_priority_less() ); } - // elog( "blocking %1%", my->current ); my->add_to_blocked( my->current ); - // my->debug("swtiching fibers..." ); - my->start_next_fiber(); - // slog( "resuming %1%", my->current ); - //slog( " %1% unblocking blocking on %2%", my->current, p.get() ); my->current->remove_blocking_promise(p.get()); my->check_fiber_exceptions(); @@ -420,7 +402,6 @@ namespace fc { void thread::notify( const promise_base::ptr& p ) { - //slog( "this %p my %p", this, my ); BOOST_ASSERT(p->ready()); if( !is_current() ) { @@ -483,7 +464,7 @@ namespace fc { void thread::notify_task_has_been_canceled() { - async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); + async( [this](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() ); } void thread::unblock(fc::context* c) @@ -491,6 +472,26 @@ namespace fc { my->unblock(c); } + namespace detail { + idle_guard::idle_guard( thread_d* t ) : notifier(t->notifier) + { + if( notifier ) + { + task_base* work = notifier->idle(); + if( work ) + { + task_base* stale_head = t->task_in_queue.load(boost::memory_order_relaxed); + do { + work->_next = stale_head; + } while( !t->task_in_queue.compare_exchange_weak( stale_head, work, boost::memory_order_release ) ); + } + } + } + idle_guard::~idle_guard() + { + if( notifier ) notifier->busy(); + } + } #ifdef _MSC_VER /* support for providing a structured exception handler for async tasks */ diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 6ad9421..c11403a 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -15,12 +15,23 @@ namespace fc { return a->resume_time > b->resume_time; } }; + + namespace detail { + class idle_guard { + public: + explicit idle_guard( thread_d* t ); + ~idle_guard(); + private: + thread_idle_notifier* notifier; + }; + } + class thread_d { public: using context_pair = std::pair; - thread_d(fc::thread& s) + thread_d( fc::thread& s, thread_idle_notifier* n = 0 ) :self(s), boost_thread(0), task_in_queue(0), next_posted_num(1), @@ -28,7 +39,8 @@ namespace fc { current(0), pt_head(0), blocked(0), - next_unused_task_storage_slot(0) + next_unused_task_storage_slot(0), + notifier(n) #ifndef NDEBUG ,non_preemptable_scope_count(0) #endif @@ -98,6 +110,8 @@ namespace fc { std::vector non_task_specific_data; unsigned next_unused_task_storage_slot; + thread_idle_notifier *notifier; + #ifndef NDEBUG unsigned non_preemptable_scope_count; #endif @@ -585,6 +599,11 @@ namespace fc { if( done ) return; + + detail::idle_guard guard( this ); + if( task_in_queue.load(boost::memory_order_relaxed) ) + continue; + if( timeout_time == time_point::maximum() ) task_ready.wait( lock ); else if( timeout_time != time_point::min() ) @@ -666,7 +685,7 @@ namespace fc { { if( fc::thread::current().my != this ) { - self.async( [=](){ unblock(c); }, "thread_d::unblock" ); + self.async( [this,c](){ unblock(c); }, "thread_d::unblock" ); return; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 600f6f8..17c8e5f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -53,6 +53,7 @@ add_executable( all_tests all_tests.cpp network/http/websocket_test.cpp thread/task_cancel.cpp thread/thread_tests.cpp + thread/parallel_tests.cpp bloom_test.cpp real128_test.cpp serialization_test.cpp diff --git a/tests/io/tcp_test.cpp b/tests/io/tcp_test.cpp index 98ef876..0a43975 100644 --- a/tests/io/tcp_test.cpp +++ b/tests/io/tcp_test.cpp @@ -18,10 +18,6 @@ BOOST_AUTO_TEST_CASE(tcpconstructor_test) class my_io_class : public fc::asio::default_io_service_scope { public: - uint16_t get_num_threads() - { - return fc::asio::default_io_service_scope::num_io_threads; - } static void reset_num_threads() { fc::asio::default_io_service_scope::num_io_threads = 0; } }; diff --git a/tests/run-parallel-tests.sh b/tests/run-parallel-tests.sh index ac7eccc..adc3eb2 100755 --- a/tests/run-parallel-tests.sh +++ b/tests/run-parallel-tests.sh @@ -5,14 +5,26 @@ if [ "$#" != 1 ]; then exit 1 fi -"$1" --list_content 2>&1 \ - | grep '\*$' \ - | sed 's=\*$==;s=^ =/=' \ - | while read t; do +CACHE="$( find . -name CMakeCache.txt )" +if [ "$CACHE" != "" ]; then + BOOST_INC="$( grep Boost_INCLUDE_DIR:PATH "$CACHE" | cut -d= -f 2 )" + if [ "$BOOST_INC" != "" ]; then + BOOST_VERSION="$( grep '^#define *BOOST_VERSION ' "$BOOST_INC/boost/version.hpp" | sed 's=^.* ==' )" + fi +fi + +if [ "$BOOST_VERSION" = "" -o "$BOOST_VERSION" -lt 105900 ]; then + echo "Boost version '$BOOST_VERSION' - executing tests serially" + "$1" +else + "$1" --list_content 2>&1 \ + | grep '\*$' \ + | sed 's=\*$==;s=^ =/=' \ + | while read t; do case "$t" in /*) echo "$pre$t"; ;; *) pre="$t"; ;; esac - done \ - | parallel echo Running {}\; "$1" -t {} - + done \ + | parallel echo Running {}\; "$1" -t {} +fi diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp new file mode 100644 index 0000000..7da4c7b --- /dev/null +++ b/tests/thread/parallel_tests.cpp @@ -0,0 +1,342 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +struct thread_config { + thread_config() { + for( int i = 0; i < boost::unit_test::framework::master_test_suite().argc - 1; ++i ) + if( !strcmp( boost::unit_test::framework::master_test_suite().argv[i], "--pool-threads" ) ) + { + uint16_t threads = atoi(boost::unit_test::framework::master_test_suite().argv[++i]); + std::cout << "Using " << threads << " pool threads\n"; + fc::asio::default_io_service_scope::set_num_threads(threads); + } + } +}; + +BOOST_GLOBAL_FIXTURE( thread_config ); + + +BOOST_AUTO_TEST_SUITE(parallel_tests) + +BOOST_AUTO_TEST_CASE( do_nothing_parallel ) +{ + std::vector> results; + results.reserve( 20 ); + for( size_t i = 0; i < results.capacity(); i++ ) + results.push_back( fc::do_parallel( [i] () { std::cout << i << ","; } ) ); + for( auto& result : results ) + result.wait(); + std::cout << "\n"; +} + +BOOST_AUTO_TEST_CASE( do_something_parallel ) +{ + struct result { + boost::thread::id thread_id; + int call_count; + }; + + std::vector> results; + results.reserve( 20 ); + boost::thread_specific_ptr tls; + for( size_t i = 0; i < results.capacity(); i++ ) + results.push_back( fc::do_parallel( [i,&tls] () { + if( !tls.get() ) { tls.reset( new int(0) ); } + result res = { boost::this_thread::get_id(), (*tls.get())++ }; + return res; + } ) ); + + std::map> results_by_thread; + for( auto& res : results ) + { + result r = res.wait(); + results_by_thread[r.thread_id].push_back( r.call_count ); + } + + BOOST_CHECK( results_by_thread.size() > 1 ); // require execution by more than 1 thread + for( auto& pair : results_by_thread ) + { // check that thread_local_storage counter works + std::sort( pair.second.begin(), pair.second.end() ); + for( size_t i = 0; i < pair.second.size(); i++ ) + BOOST_CHECK_EQUAL( i, pair.second[i] ); + } +} + +const std::string TEXT = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"$%&/()=?,.-#+´{[]}`*'_:;<>|"; + +template +class hash_test { + public: + std::string _hashname = fc::get_typename::name(); + + void run_single_threaded() { + const std::string first = Hash::hash(TEXT).str(); + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < 1000; i++ ) + BOOST_CHECK_EQUAL( first, Hash::hash(TEXT).str() ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} single-threaded ${h}'s in ${t}µs", ("c",1000)("h",_hashname)("t",end-start) ); + } + + void run_multi_threaded() { + const std::string first = Hash::hash(TEXT).str(); + std::vector> results; + results.reserve( 10000 ); + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < 10000; i++ ) + results.push_back( fc::do_parallel( [] () { return Hash::hash(TEXT).str(); } ) ); + for( auto& result: results ) + BOOST_CHECK_EQUAL( first, result.wait() ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} multi-threaded ${h}'s in ${t}µs", ("c",10000)("h",_hashname)("t",end-start) ); + } + + void run() { + run_single_threaded(); + run_multi_threaded(); + } +}; + +BOOST_AUTO_TEST_CASE( hash_parallel ) +{ + hash_test().run(); + hash_test().run(); + hash_test().run(); + hash_test().run(); + hash_test().run(); +} + +BOOST_AUTO_TEST_CASE( sign_verify_parallel ) +{ + const fc::sha256 HASH = fc::sha256::hash(TEXT); + + std::vector keys; + keys.reserve(1000); + for( int i = 0; i < 1000; i++ ) + keys.push_back( fc::ecc::private_key::regenerate( fc::sha256::hash( TEXT + fc::to_string(i) ) ) ); + + std::vector sigs; + sigs.reserve( 10 * keys.size() ); + { + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < 10; i++ ) + for( const auto& key: keys ) + sigs.push_back( key.sign_compact( HASH ) ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} single-threaded signatures in ${t}µs", ("c",sigs.size())("t",end-start) ); + } + + { + fc::time_point start = fc::time_point::now(); + for( size_t i = 0; i < sigs.size(); i++ ) + BOOST_CHECK( keys[i % keys.size()].get_public_key() == fc::ecc::public_key( sigs[i], HASH ) ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} single-threaded verifies in ${t}µs", ("c",sigs.size())("t",end-start) ); + } + + { + std::vector> results; + results.reserve( 10 * keys.size() ); + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < 10; i++ ) + for( const auto& key: keys ) + results.push_back( fc::do_parallel( [&key,&HASH] () { return key.sign_compact( HASH ); } ) ); + for( auto& res : results ) + res.wait(); + fc::time_point end = fc::time_point::now(); + ilog( "${c} multi-threaded signatures in ${t}µs", ("c",sigs.size())("t",end-start) ); + } + + { + std::vector> results; + results.reserve( sigs.size() ); + fc::time_point start = fc::time_point::now(); + for( const auto& sig: sigs ) + results.push_back( fc::do_parallel( [&sig,&HASH] () { return fc::ecc::public_key( sig, HASH ); } ) ); + for( size_t i = 0; i < results.size(); i++ ) + BOOST_CHECK( keys[i % keys.size()].get_public_key() == results[i].wait() ); + fc::time_point end = fc::time_point::now(); + ilog( "${c} multi-threaded verifies in ${t}µs", ("c",sigs.size())("t",end-start) ); + } +} + +BOOST_AUTO_TEST_CASE( serial_valve ) +{ + boost::atomic counter(0); + fc::serial_valve valve; + + { // Simple test, f2 finishes before f1 + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + auto p1 = fc::async([&counter,&valve,syncer,waiter] () { + valve.do_serial( [syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p2.f1 has started executing and p2.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p2.wait(); // and wait for p2.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK_EQUAL( 2, counter.load() ); + } + + { // Triple test, f3 finishes first, then f1, finally f2 + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + counter.store(0); + auto p1 = fc::async([&counter,&valve,syncer,waiter] () { + valve.do_serial( [&syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [&syncer](){ syncer->set_value(); + fc::usleep( fc::milliseconds(100) ); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p2.f1 has started executing and is sleeping + + syncer = new fc::promise(); + auto p3 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p3.f1 has started executing and p3.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + BOOST_CHECK( !p3.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p3.wait(); // and wait for p3.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK( p3.ready() ); + BOOST_CHECK_EQUAL( 3, counter.load() ); + } + + { // Triple test again but with invocations from different threads + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + counter.store(0); + auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () { + valve.do_serial( [&syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::do_parallel([&counter,&valve,syncer] () { + valve.do_serial( [&syncer](){ syncer->set_value(); + fc::usleep( fc::milliseconds(100) ); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p2.f1 has started executing and is sleeping + + syncer = new fc::promise(); + auto p3 = fc::do_parallel([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p3.f1 has started executing and p3.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + BOOST_CHECK( !p3.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p3.wait(); // and wait for p3.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK( p3.ready() ); + BOOST_CHECK_EQUAL( 3, counter.load() ); + } +} + +BOOST_AUTO_TEST_SUITE_END()