diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp index c56047b..3118703 100644 --- a/include/fc/thread/parallel.hpp +++ b/include/fc/thread/parallel.hpp @@ -45,6 +45,46 @@ namespace fc { worker_pool& get_worker_pool(); } + class serial_valve { + private: + class ticket_guard { + public: + ticket_guard( boost::atomic*>& latch ); + ~ticket_guard(); + void wait_for_my_turn(); + private: + promise* my_promise; + future* ticket; + }; + + friend class ticket_guard; + boost::atomic*> latch; + + public: + serial_valve(); + ~serial_valve(); + + /** Executes f1() then f2(). + * For any two calls do_serial(f1,f2) and do_serial(f1',f2') where + * do_serial(f1,f2) is invoked before do_serial(f1',f2'), it is + * guaranteed that f2' will be executed after f2 has completed. Failure + * of either function counts as completion of both. + * If f1 throws then f2 will not be invoked. + * + * @param f1 a functor to invoke + * @param f2 a functor to invoke + * @return the return value of f2() + */ + template + auto do_serial( const Functor1& f1, const Functor2& f2 ) -> decltype(f2()) + { + ticket_guard guard( latch ); + f1(); + guard.wait_for_my_turn(); + return f2(); + } + }; + /** * Calls function f in a separate thread and returns a future * that can be used to wait on the result. diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp index 046b283..5e16961 100644 --- a/src/thread/parallel.cpp +++ b/src/thread/parallel.cpp @@ -151,4 +151,48 @@ namespace fc { return the_pool; } } -} + + serial_valve::ticket_guard::ticket_guard( boost::atomic*>& latch ) + { + my_promise = new promise(); + future* my_future = new future( promise::ptr( my_promise, true ) ); + try + { + do + { + ticket = latch.load(); + FC_ASSERT( ticket, "Valve is shutting down!" ); + } + while( !latch.compare_exchange_weak( ticket, my_future ) ); + } + catch (...) + { + delete my_future; // this takes care of my_promise as well + throw; + } + } + + serial_valve::ticket_guard::~ticket_guard() + { + my_promise->set_value(); + ticket->wait(); + delete ticket; + } + + void serial_valve::ticket_guard::wait_for_my_turn() + { + ticket->wait(); + } + + serial_valve::serial_valve() + { + latch.store( new future( promise::ptr( new promise( true ), true ) ) ); + } + + serial_valve::~serial_valve() + { + fc::future* last = latch.exchange( 0 ); + last->wait(); + delete last; + } +} // namespace fc diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp index e1ad2b7..144175b 100644 --- a/tests/thread/parallel_tests.cpp +++ b/tests/thread/parallel_tests.cpp @@ -177,4 +177,149 @@ BOOST_AUTO_TEST_CASE( sign_verify_parallel ) } } +BOOST_AUTO_TEST_CASE( serial_valve ) +{ + boost::atomic counter(0); + fc::serial_valve valve; + + { // Simple test, f2 finishes before f1 + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + auto p1 = fc::async([&counter,&valve,syncer,waiter] () { + valve.do_serial( [syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p2.f1 has started executing and p2.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p2.wait(); // and wait for p2.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK_EQUAL( 2, counter.load() ); + } + + { // Triple test, f3 finishes first, then f1, finally f2 + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + counter.store(0); + auto p1 = fc::async([&counter,&valve,syncer,waiter] () { + valve.do_serial( [&syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [&syncer](){ syncer->set_value(); + fc::usleep( fc::milliseconds(100) ); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p2.f1 has started executing and is sleeping + + syncer = new fc::promise(); + auto p3 = fc::async([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p3.f1 has started executing and p3.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + BOOST_CHECK( !p3.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p3.wait(); // and wait for p3.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK( p3.ready() ); + BOOST_CHECK_EQUAL( 3, counter.load() ); + } + + { // Triple test again but with invocations from different threads + fc::promise* syncer = new fc::promise(); + fc::promise* waiter = new fc::promise(); + counter.store(0); + auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () { + valve.do_serial( [&syncer,waiter](){ syncer->set_value(); + fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + [&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p1.f1 has started executing and is waiting on waiter + + syncer = new fc::promise(); + auto p2 = fc::do_parallel([&counter,&valve,syncer] () { + valve.do_serial( [&syncer](){ syncer->set_value(); + fc::usleep( fc::milliseconds(100) ); }, + [&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + // at this point, p2.f1 has started executing and is sleeping + + syncer = new fc::promise(); + auto p3 = fc::do_parallel([&counter,&valve,syncer] () { + valve.do_serial( [syncer](){ syncer->set_value(); }, + [&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() ); + counter.fetch_add(1); } ); + }); + fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + + fc::usleep( fc::milliseconds(10) ); + + // at this point, p3.f1 has started executing and p3.f2 is waiting for its turn + + BOOST_CHECK( !p1.ready() ); + BOOST_CHECK( !p2.ready() ); + BOOST_CHECK( !p3.ready() ); + + waiter->set_value(); // signal p1.f1 to continue + + p3.wait(); // and wait for p3.f2 to complete + + BOOST_CHECK( p1.ready() ); + BOOST_CHECK( p2.ready() ); + BOOST_CHECK( p3.ready() ); + BOOST_CHECK_EQUAL( 3, counter.load() ); + } +} + BOOST_AUTO_TEST_SUITE_END()