diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp new file mode 100644 index 0000000..eebfaeb --- /dev/null +++ b/include/fc/thread/parallel.hpp @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include +#include + +namespace fc { + + namespace detail { + template + class parallel_completion_handler { + public: + parallel_completion_handler( Task* task ) : _task(task) {} + void operator()() { _task->run(); } + private: + Task* _task; + }; + } + + /** + * Calls function f in a separate thread and returns a future + * that can be used to wait on the result. + * + * @param f the operation to perform + */ + template + auto do_parallel( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG ) -> fc::future { + typedef decltype(f()) Result; + typedef typename fc::deduce::type FunctorType; + fc::task* tsk = + new fc::task( fc::forward(f), desc ); + fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); + fc::asio::default_io_service().post( detail::parallel_completion_handler>( tsk ) ); + return r; + } +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 600f6f8..17c8e5f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -53,6 +53,7 @@ add_executable( all_tests all_tests.cpp network/http/websocket_test.cpp thread/task_cancel.cpp thread/thread_tests.cpp + thread/parallel_tests.cpp bloom_test.cpp real128_test.cpp serialization_test.cpp diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp new file mode 100644 index 0000000..67ec6ae --- /dev/null +++ b/tests/thread/parallel_tests.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2018 The BitShares Blockchain, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include + +#include + +BOOST_AUTO_TEST_SUITE(parallel_tests) + +BOOST_AUTO_TEST_CASE( do_nothing_parallel ) +{ + std::vector> results; + results.reserve( 20 ); + for( size_t i = 0; i < results.capacity(); i++ ) + results.push_back( fc::do_parallel( [i] () { std::cout << i << ","; } ) ); + for( auto& result : results ) + result.wait(); + std::cout << "\n"; +} + +BOOST_AUTO_TEST_CASE( do_something_parallel ) +{ + struct result { + boost::thread::id thread_id; + int call_count; + }; + + std::vector> results; + results.reserve( 20 ); + boost::thread_specific_ptr tls; + for( size_t i = 0; i < results.capacity(); i++ ) + results.push_back( fc::do_parallel( [i,&tls] () { + if( !tls.get() ) { tls.reset( new int(0) ); } + result res = { boost::this_thread::get_id(), (*tls.get())++ }; + return res; + } ) ); + + std::map> results_by_thread; + for( auto& res : results ) + { + result r = res.wait(); + results_by_thread[r.thread_id].push_back( r.call_count ); + } + + BOOST_CHECK( results_by_thread.size() > 1 ); // require execution by more than 1 thread + for( auto& pair : results_by_thread ) + { // check that thread_local_storage counter works + std::sort( pair.second.begin(), pair.second.end() ); + for( size_t i = 0; i < pair.second.size(); i++ ) + BOOST_CHECK_EQUAL( i, pair.second[i] ); + } +} + +BOOST_AUTO_TEST_SUITE_END()