Updates from BitShares FC #22
3 changed files with 230 additions and 1 deletions
|
|
@ -45,6 +45,46 @@ namespace fc {
|
|||
worker_pool& get_worker_pool();
|
||||
}
|
||||
|
||||
class serial_valve {
|
||||
private:
|
||||
class ticket_guard {
|
||||
public:
|
||||
ticket_guard( boost::atomic<future<void>*>& latch );
|
||||
~ticket_guard();
|
||||
void wait_for_my_turn();
|
||||
private:
|
||||
promise<void>* my_promise;
|
||||
future<void>* ticket;
|
||||
};
|
||||
|
||||
friend class ticket_guard;
|
||||
boost::atomic<future<void>*> latch;
|
||||
|
||||
public:
|
||||
serial_valve();
|
||||
~serial_valve();
|
||||
|
||||
/** Executes f1() then f2().
|
||||
* For any two calls do_serial(f1,f2) and do_serial(f1',f2') where
|
||||
* do_serial(f1,f2) is invoked before do_serial(f1',f2'), it is
|
||||
* guaranteed that f2' will be executed after f2 has completed. Failure
|
||||
* of either function counts as completion of both.
|
||||
* If f1 throws then f2 will not be invoked.
|
||||
*
|
||||
* @param f1 a functor to invoke
|
||||
* @param f2 a functor to invoke
|
||||
* @return the return value of f2()
|
||||
*/
|
||||
template<typename Functor1,typename Functor2>
|
||||
auto do_serial( const Functor1& f1, const Functor2& f2 ) -> decltype(f2())
|
||||
{
|
||||
ticket_guard guard( latch );
|
||||
f1();
|
||||
guard.wait_for_my_turn();
|
||||
return f2();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Calls function <code>f</code> in a separate thread and returns a future
|
||||
* that can be used to wait on the result.
|
||||
|
|
|
|||
|
|
@ -151,4 +151,48 @@ namespace fc {
|
|||
return the_pool;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
serial_valve::ticket_guard::ticket_guard( boost::atomic<future<void>*>& latch )
|
||||
{
|
||||
my_promise = new promise<void>();
|
||||
future<void>* my_future = new future<void>( promise<void>::ptr( my_promise, true ) );
|
||||
try
|
||||
{
|
||||
do
|
||||
{
|
||||
ticket = latch.load();
|
||||
FC_ASSERT( ticket, "Valve is shutting down!" );
|
||||
}
|
||||
while( !latch.compare_exchange_weak( ticket, my_future ) );
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
delete my_future; // this takes care of my_promise as well
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
serial_valve::ticket_guard::~ticket_guard()
|
||||
{
|
||||
my_promise->set_value();
|
||||
ticket->wait();
|
||||
delete ticket;
|
||||
}
|
||||
|
||||
void serial_valve::ticket_guard::wait_for_my_turn()
|
||||
{
|
||||
ticket->wait();
|
||||
}
|
||||
|
||||
serial_valve::serial_valve()
|
||||
{
|
||||
latch.store( new future<void>( promise<void>::ptr( new promise<void>( true ), true ) ) );
|
||||
}
|
||||
|
||||
serial_valve::~serial_valve()
|
||||
{
|
||||
fc::future<void>* last = latch.exchange( 0 );
|
||||
last->wait();
|
||||
delete last;
|
||||
}
|
||||
} // namespace fc
|
||||
|
|
|
|||
|
|
@ -177,4 +177,149 @@ BOOST_AUTO_TEST_CASE( sign_verify_parallel )
|
|||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( serial_valve )
|
||||
{
|
||||
boost::atomic<uint32_t> counter(0);
|
||||
fc::serial_valve valve;
|
||||
|
||||
{ // Simple test, f2 finishes before f1
|
||||
fc::promise<void>* syncer = new fc::promise<void>();
|
||||
fc::promise<void>* waiter = new fc::promise<void>();
|
||||
auto p1 = fc::async([&counter,&valve,syncer,waiter] () {
|
||||
valve.do_serial( [syncer,waiter](){ syncer->set_value();
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
|
||||
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
|
||||
counter.fetch_add(1); } );
|
||||
});
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
|
||||
|
||||
// at this point, p1.f1 has started executing and is waiting on waiter
|
||||
|
||||
syncer = new fc::promise<void>();
|
||||
auto p2 = fc::async([&counter,&valve,syncer] () {
|
||||
valve.do_serial( [syncer](){ syncer->set_value(); },
|
||||
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
|
||||
counter.fetch_add(1); } );
|
||||
});
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
|
||||
|
||||
fc::usleep( fc::milliseconds(10) );
|
||||
|
||||
// at this point, p2.f1 has started executing and p2.f2 is waiting for its turn
|
||||
|
||||
BOOST_CHECK( !p1.ready() );
|
||||
BOOST_CHECK( !p2.ready() );
|
||||
|
||||
waiter->set_value(); // signal p1.f1 to continue
|
||||
|
||||
p2.wait(); // and wait for p2.f2 to complete
|
||||
|
||||
BOOST_CHECK( p1.ready() );
|
||||
BOOST_CHECK( p2.ready() );
|
||||
BOOST_CHECK_EQUAL( 2, counter.load() );
|
||||
}
|
||||
|
||||
{ // Triple test, f3 finishes first, then f1, finally f2
|
||||
fc::promise<void>* syncer = new fc::promise<void>();
|
||||
fc::promise<void>* waiter = new fc::promise<void>();
|
||||
counter.store(0);
|
||||
auto p1 = fc::async([&counter,&valve,syncer,waiter] () {
|
||||
valve.do_serial( [&syncer,waiter](){ syncer->set_value();
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
|
||||
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
|
||||
counter.fetch_add(1); } );
|
||||
});
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
|
||||
|
||||
// at this point, p1.f1 has started executing and is waiting on waiter
|
||||
|
||||
syncer = new fc::promise<void>();
|
||||
auto p2 = fc::async([&counter,&valve,syncer] () {
|
||||
valve.do_serial( [&syncer](){ syncer->set_value();
|
||||
fc::usleep( fc::milliseconds(100) ); },
|
||||
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
|
||||
counter.fetch_add(1); } );
|
||||
});
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
|
||||
|
||||
// at this point, p2.f1 has started executing and is sleeping
|
||||
|
||||
syncer = new fc::promise<void>();
|
||||
auto p3 = fc::async([&counter,&valve,syncer] () {
|
||||
valve.do_serial( [syncer](){ syncer->set_value(); },
|
||||
[&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() );
|
||||
counter.fetch_add(1); } );
|
||||
});
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
|
||||
|
||||
fc::usleep( fc::milliseconds(10) );
|
||||
|
||||
// at this point, p3.f1 has started executing and p3.f2 is waiting for its turn
|
||||
|
||||
BOOST_CHECK( !p1.ready() );
|
||||
BOOST_CHECK( !p2.ready() );
|
||||
BOOST_CHECK( !p3.ready() );
|
||||
|
||||
waiter->set_value(); // signal p1.f1 to continue
|
||||
|
||||
p3.wait(); // and wait for p3.f2 to complete
|
||||
|
||||
BOOST_CHECK( p1.ready() );
|
||||
BOOST_CHECK( p2.ready() );
|
||||
BOOST_CHECK( p3.ready() );
|
||||
BOOST_CHECK_EQUAL( 3, counter.load() );
|
||||
}
|
||||
|
||||
{ // Triple test again but with invocations from different threads
|
||||
fc::promise<void>* syncer = new fc::promise<void>();
|
||||
fc::promise<void>* waiter = new fc::promise<void>();
|
||||
counter.store(0);
|
||||
auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () {
|
||||
valve.do_serial( [&syncer,waiter](){ syncer->set_value();
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
|
||||
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
|
||||
counter.fetch_add(1); } );
|
||||
});
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
|
||||
|
||||
// at this point, p1.f1 has started executing and is waiting on waiter
|
||||
|
||||
syncer = new fc::promise<void>();
|
||||
auto p2 = fc::do_parallel([&counter,&valve,syncer] () {
|
||||
valve.do_serial( [&syncer](){ syncer->set_value();
|
||||
fc::usleep( fc::milliseconds(100) ); },
|
||||
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
|
||||
counter.fetch_add(1); } );
|
||||
});
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
|
||||
|
||||
// at this point, p2.f1 has started executing and is sleeping
|
||||
|
||||
syncer = new fc::promise<void>();
|
||||
auto p3 = fc::do_parallel([&counter,&valve,syncer] () {
|
||||
valve.do_serial( [syncer](){ syncer->set_value(); },
|
||||
[&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() );
|
||||
counter.fetch_add(1); } );
|
||||
});
|
||||
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
|
||||
|
||||
fc::usleep( fc::milliseconds(10) );
|
||||
|
||||
// at this point, p3.f1 has started executing and p3.f2 is waiting for its turn
|
||||
|
||||
BOOST_CHECK( !p1.ready() );
|
||||
BOOST_CHECK( !p2.ready() );
|
||||
BOOST_CHECK( !p3.ready() );
|
||||
|
||||
waiter->set_value(); // signal p1.f1 to continue
|
||||
|
||||
p3.wait(); // and wait for p3.f2 to complete
|
||||
|
||||
BOOST_CHECK( p1.ready() );
|
||||
BOOST_CHECK( p2.ready() );
|
||||
BOOST_CHECK( p3.ready() );
|
||||
BOOST_CHECK_EQUAL( 3, counter.load() );
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
|
|
|||
Loading…
Reference in a new issue