diff --git a/CMakeLists.txt b/CMakeLists.txt index 7b76491..23bf263 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -217,6 +217,7 @@ set( fc_sources src/thread/spin_lock.cpp src/thread/spin_yield_lock.cpp src/thread/mutex.cpp + src/thread/parallel.cpp src/thread/non_preemptable_scope_check.cpp src/asio.cpp src/string.cpp diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index a2a0fb7..3d11a38 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -77,7 +77,8 @@ namespace asio { public: default_io_service_scope(); ~default_io_service_scope(); - static void set_num_threads(uint16_t num_threads); + static void set_num_threads(uint16_t num_threads); + static uint16_t get_num_threads(); boost::asio::io_service* io; private: std::vector asio_threads; diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp index 3425b85..c56047b 100644 --- a/include/fc/thread/parallel.hpp +++ b/include/fc/thread/parallel.hpp @@ -25,23 +25,24 @@ #pragma once #include +#include #include -/* NOTE: the methods in this header are NOT to be mixed up with fc's - * multithreading. Parallel functions MUST NOT call fc::thread::yield NOR - * use fc's mutexes etc.! - */ namespace fc { namespace detail { - template - class parallel_completion_handler { - public: - parallel_completion_handler( Task* task ) : _task(task) {} - void operator()() { _task->run(); } - private: - Task* _task; + class pool_impl; + + class worker_pool { + public: + worker_pool(); + ~worker_pool(); + void post( task_base* task ); + private: + pool_impl* my; }; + + worker_pool& get_worker_pool(); } /** @@ -57,7 +58,7 @@ namespace fc { fc::task* tsk = new fc::task( fc::forward(f), desc ); fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - fc::asio::default_io_service().post( detail::parallel_completion_handler>( tsk ) ); + detail::get_worker_pool().post( tsk ); return r; } } diff --git a/src/asio.cpp b/src/asio.cpp index cec7de6..0716bad 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -101,10 +101,12 @@ namespace fc { * @param num_threads the number of threads */ void default_io_service_scope::set_num_threads(uint16_t num_threads) { - FC_ASSERT(fc::asio::default_io_service_scope::num_io_threads == 0); - fc::asio::default_io_service_scope::num_io_threads = num_threads; + FC_ASSERT(num_io_threads == 0); + num_io_threads = num_threads; } + uint16_t default_io_service_scope::get_num_threads() { return num_io_threads; } + /*** * Default constructor */ @@ -113,14 +115,14 @@ namespace fc { io = new boost::asio::io_service(); the_work = new boost::asio::io_service::work(*io); - if (this->num_io_threads == 0) + if( num_io_threads == 0 ) { // the default was not set by the configuration. Determine a good // number of threads. Minimum of 8, maximum of hardware_concurrency - this->num_io_threads = std::max( boost::thread::hardware_concurrency(), 8u ); + num_io_threads = std::max( boost::thread::hardware_concurrency(), 8u ); } - for( uint16_t i = 0; i < this->num_io_threads; ++i ) + for( uint16_t i = 0; i < num_io_threads; ++i ) { asio_threads.push_back( new boost::thread( [i,this]() { diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp new file mode 100644 index 0000000..046b283 --- /dev/null +++ b/src/thread/parallel.cpp @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include +#include +#include + +#include +#include + +namespace fc { + namespace detail { + class pool_impl; + + class idle_notifier_impl : public thread_idle_notifier + { + public: + idle_notifier_impl() + { + is_idle.store(false); + } + + idle_notifier_impl( const idle_notifier_impl& copy ) + { + id = copy.id; + my_pool = copy.my_pool; + is_idle.store( copy.is_idle.load() ); + } + + virtual task_base* idle(); + virtual void busy() + { + is_idle.store(false); + } + + uint32_t id; + pool_impl* my_pool; + boost::atomic is_idle; + }; + + class pool_impl + { + public: + pool_impl( const uint16_t num_threads ) + { + notifiers.resize( num_threads ); + threads.reserve( num_threads ); + for( uint32_t i = 0; i < num_threads; i++ ) + { + notifiers[i].id = i; + notifiers[i].my_pool = this; + threads.push_back( new thread( "pool worker " + fc::to_string(i), ¬ifiers[i] ) ); + } + } + ~pool_impl() + { + for( thread* t : threads) + delete t; // also calls quit() + waiting_tasks.consume_all( [] ( task_base* t ) { + t->cancel( "thread pool quitting" ); + }); + } + + void post( task_base* task ) + { + idle_notifier_impl* ini; + while( idle_threads.pop( ini ) ) + if( ini->is_idle.exchange( false ) ) + { // minor race condition here, a thread might receive a task while it's busy + threads[ini->id]->async_task( task, priority() ); + return; + } + boost::unique_lock lock(pool_lock); + while( idle_threads.pop( ini ) ) + if( ini->is_idle.exchange( false ) ) + { // minor race condition here, a thread might receive a task while it's busy + threads[ini->id]->async_task( task, priority() ); + return; + } + waiting_tasks.push( task ); + } + + task_base* enqueue_idle_thread( idle_notifier_impl* ini ) + { + task_base* task; + if( waiting_tasks.pop( task ) ) + return task; + fc::unique_lock lock(pool_lock); + if( waiting_tasks.pop( task ) ) + return task; + idle_threads.push( ini ); + return 0; + } + private: + std::vector notifiers; + std::vector threads; + boost::lockfree::queue idle_threads; + boost::lockfree::queue waiting_tasks; + fc::spin_yield_lock pool_lock; + }; + + task_base* idle_notifier_impl::idle() + { + is_idle.store( true ); + task_base* result = my_pool->enqueue_idle_thread( this ); + if( result ) is_idle.store( false ); + return result; + } + + worker_pool::worker_pool() + { + fc::asio::default_io_service(); + my = new pool_impl( fc::asio::default_io_service_scope::get_num_threads() ); + } + + worker_pool::~worker_pool() + { + delete my; + } + + void worker_pool::post( task_base* task ) + { + my->post( task ); + } + + worker_pool& get_worker_pool() + { + static worker_pool the_pool; + return the_pool; + } + } +} diff --git a/tests/io/tcp_test.cpp b/tests/io/tcp_test.cpp index 98ef876..0a43975 100644 --- a/tests/io/tcp_test.cpp +++ b/tests/io/tcp_test.cpp @@ -18,10 +18,6 @@ BOOST_AUTO_TEST_CASE(tcpconstructor_test) class my_io_class : public fc::asio::default_io_service_scope { public: - uint16_t get_num_threads() - { - return fc::asio::default_io_service_scope::num_io_threads; - } static void reset_num_threads() { fc::asio::default_io_service_scope::num_io_threads = 0; } };