Updates from BitShares FC #22

Closed
nathanielhourt wants to merge 693 commits from dapp-support into latest-fc
20 changed files with 173 additions and 240 deletions
Showing only changes of commit 7cf371736b - Show all commits

View file

@ -211,7 +211,6 @@ set( fc_sources
src/thread/non_preemptable_scope_check.cpp src/thread/non_preemptable_scope_check.cpp
src/asio.cpp src/asio.cpp
src/string.cpp src/string.cpp
src/shared_ptr.cpp
src/stacktrace.cpp src/stacktrace.cpp
src/time.cpp src/time.cpp
src/utf8.cpp src/utf8.cpp

View file

@ -122,7 +122,7 @@ namespace asio {
template<typename AsyncReadStream, typename MutableBufferSequence> template<typename AsyncReadStream, typename MutableBufferSequence>
future<size_t> read_some(AsyncReadStream& s, const MutableBufferSequence& buf) future<size_t> read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
{ {
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some")); promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
s.async_read_some(buf, detail::read_write_handler(completion_promise)); s.async_read_some(buf, detail::read_write_handler(completion_promise));
return completion_promise;//->wait(); return completion_promise;//->wait();
} }
@ -130,7 +130,7 @@ namespace asio {
template<typename AsyncReadStream> template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0) future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0)
{ {
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some")); promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
s.async_read_some(boost::asio::buffer(buffer + offset, length), s.async_read_some(boost::asio::buffer(buffer + offset, length),
detail::read_write_handler(completion_promise)); detail::read_write_handler(completion_promise));
return completion_promise;//->wait(); return completion_promise;//->wait();
@ -139,7 +139,7 @@ namespace asio {
template<typename AsyncReadStream> template<typename AsyncReadStream>
future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length, size_t offset) future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
{ {
promise<size_t>::ptr completion_promise(new promise<size_t>("fc::asio::async_read_some")); promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), s.async_read_some(boost::asio::buffer(buffer.get() + offset, length),
detail::read_write_handler_with_buffer(completion_promise, buffer)); detail::read_write_handler_with_buffer(completion_promise, buffer));
return completion_promise;//->wait(); return completion_promise;//->wait();
@ -179,7 +179,7 @@ namespace asio {
*/ */
template<typename AsyncWriteStream, typename ConstBufferSequence> template<typename AsyncWriteStream, typename ConstBufferSequence>
size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) { size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write")); promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write");
boost::asio::async_write(s, buf, detail::read_write_handler(p)); boost::asio::async_write(s, buf, detail::read_write_handler(p));
return p->wait(); return p->wait();
} }
@ -191,7 +191,7 @@ namespace asio {
*/ */
template<typename AsyncWriteStream, typename ConstBufferSequence> template<typename AsyncWriteStream, typename ConstBufferSequence>
future<size_t> write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { future<size_t> write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some")); promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
s.async_write_some( buf, detail::read_write_handler(p)); s.async_write_some( buf, detail::read_write_handler(p));
return p; //->wait(); return p; //->wait();
} }
@ -199,7 +199,7 @@ namespace asio {
template<typename AsyncWriteStream> template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const char* buffer, future<size_t> write_some( AsyncWriteStream& s, const char* buffer,
size_t length, size_t offset = 0) { size_t length, size_t offset = 0) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some")); promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p)); s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p));
return p; //->wait(); return p; //->wait();
} }
@ -207,7 +207,7 @@ namespace asio {
template<typename AsyncWriteStream> template<typename AsyncWriteStream>
future<size_t> write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer, future<size_t> write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
size_t length, size_t offset ) { size_t length, size_t offset ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some")); promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer)); s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer));
return p; //->wait(); return p; //->wait();
} }
@ -250,7 +250,7 @@ namespace asio {
template<typename SocketType, typename AcceptorType> template<typename SocketType, typename AcceptorType>
void accept( AcceptorType& acc, SocketType& sock ) { void accept( AcceptorType& acc, SocketType& sock ) {
//promise<boost::system::error_code>::ptr p( new promise<boost::system::error_code>("fc::asio::tcp::accept") ); //promise<boost::system::error_code>::ptr p( new promise<boost::system::error_code>("fc::asio::tcp::accept") );
promise<void>::ptr p( new promise<void>("fc::asio::tcp::accept") ); promise<void>::ptr p = promise<void>::create("fc::asio::tcp::accept");
acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
p->wait(); p->wait();
//if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
@ -262,7 +262,7 @@ namespace asio {
*/ */
template<typename AsyncSocket, typename EndpointType> template<typename AsyncSocket, typename EndpointType>
void connect( AsyncSocket& sock, const EndpointType& ep ) { void connect( AsyncSocket& sock, const EndpointType& ep ) {
promise<void>::ptr p(new promise<void>("fc::asio::tcp::connect")); promise<void>::ptr p = promise<void>::create("fc::asio::tcp::connect");
sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
p->wait(); p->wait();
//if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );

View file

@ -1,92 +0,0 @@
#pragma once
#include <cstdint>
#include <utility>
namespace fc {
/**
* @brief used to create reference counted types.
*
* Must be a virtual base class that is initialized with the
*
*/
class retainable {
public:
retainable();
void retain();
void release();
int32_t retain_count()const;
protected:
virtual ~retainable();
private:
volatile int32_t _ref_count;
};
template<typename T>
class shared_ptr {
public:
template<typename Other>
shared_ptr( const shared_ptr<Other>& o )
:_ptr(o.get()) {
if(_ptr != nullptr ) _ptr->retain();
}
shared_ptr( const shared_ptr& o )
:_ptr(o.get()) {
if(_ptr != nullptr ) _ptr->retain();
}
shared_ptr( T* t, bool inc = false )
:_ptr(t) { if( inc && t != nullptr) t->retain(); }
shared_ptr():_ptr(nullptr){}
shared_ptr( shared_ptr&& p )
:_ptr(p._ptr){ p._ptr = nullptr; }
~shared_ptr() { if( nullptr != _ptr ) { _ptr->release(); _ptr = nullptr; } }
shared_ptr& reset( T* v = nullptr, bool inc = false ) {
if( v == _ptr ) return *this;
if( inc && nullptr != v ) v->retain();
if( nullptr != _ptr ) _ptr->release();
_ptr = v;
return *this;
}
shared_ptr& operator=(const shared_ptr& p ) {
if( _ptr == p._ptr ) return *this;
if( p._ptr != nullptr ) p._ptr->retain();
if( _ptr != nullptr ) _ptr->release();
_ptr = p._ptr;
return *this;
}
shared_ptr& operator=(shared_ptr&& p ) {
std::swap(_ptr,p._ptr);
return *this;
}
T& operator* ()const { return *_ptr; }
T* operator-> ()const { return _ptr; }
bool operator==( const shared_ptr& p )const { return get() == p.get(); }
bool operator<( const shared_ptr& p )const { return get() < p.get(); }
T * get() const { return _ptr; }
bool operator!()const { return _ptr == 0; }
operator bool()const { return _ptr != 0; }
private:
T* _ptr;
};
template<typename T, typename O>
fc::shared_ptr<T> dynamic_pointer_cast( const fc::shared_ptr<O>& t ) {
return fc::shared_ptr<T>( dynamic_cast<T*>(t.get()), true );
}
template<typename T, typename O>
fc::shared_ptr<T> static_pointer_cast( const fc::shared_ptr<O>& t ) {
return fc::shared_ptr<T>( static_cast<T*>(t.get()), true );
}
}

View file

@ -36,13 +36,13 @@ namespace fc {
template<typename T> template<typename T>
inline T wait( boost::signals2::signal<void(T)>& sig, const microseconds& timeout_us=microseconds::maximum() ) { inline T wait( boost::signals2::signal<void(T)>& sig, const microseconds& timeout_us=microseconds::maximum() ) {
typename promise<T>::ptr p(new promise<T>("fc::signal::wait")); typename promise<T>::ptr p = promise<T>::create("fc::signal::wait");
boost::signals2::scoped_connection c( sig.connect( [=]( T t ) { p->set_value(t); } )); boost::signals2::scoped_connection c( sig.connect( [=]( T t ) { p->set_value(t); } ));
return p->wait( timeout_us ); return p->wait( timeout_us );
} }
inline void wait( boost::signals2::signal<void()>& sig, const microseconds& timeout_us=microseconds::maximum() ) { inline void wait( boost::signals2::signal<void()>& sig, const microseconds& timeout_us=microseconds::maximum() ) {
promise<void>::ptr p(new promise<void>("fc::signal::wait")); promise<void>::ptr p = promise<void>::create("fc::signal::wait");
boost::signals2::scoped_connection c( sig.connect( [=]() { p->set_value(); } )); boost::signals2::scoped_connection c( sig.connect( [=]() { p->set_value(); } ));
p->wait( timeout_us ); p->wait( timeout_us );
} }

View file

@ -1,9 +1,11 @@
#pragma once #pragma once
#include <fc/time.hpp> #include <fc/time.hpp>
#include <fc/shared_ptr.hpp>
#include <fc/exception/exception.hpp> #include <fc/exception/exception.hpp>
#include <fc/thread/spin_yield_lock.hpp> #include <fc/thread/spin_yield_lock.hpp>
#include <fc/optional.hpp> #include <fc/optional.hpp>
#include <memory>
#include <boost/atomic.hpp>
//#define FC_TASK_NAMES_ARE_MANDATORY 1 //#define FC_TASK_NAMES_ARE_MANDATORY 1
#ifdef FC_TASK_NAMES_ARE_MANDATORY #ifdef FC_TASK_NAMES_ARE_MANDATORY
@ -56,10 +58,10 @@ namespace fc {
}; };
} }
class promise_base : public virtual retainable{ class promise_base : public std::enable_shared_from_this<promise_base> {
public: public:
typedef fc::shared_ptr<promise_base> ptr; typedef std::shared_ptr<promise_base> ptr;
promise_base(const char* desc FC_TASK_NAME_DEFAULT_ARG); virtual ~promise_base();
const char* get_desc()const; const char* get_desc()const;
@ -70,7 +72,12 @@ namespace fc {
void set_exception( const fc::exception_ptr& e ); void set_exception( const fc::exception_ptr& e );
void retain();
void release();
protected: protected:
promise_base(const char* desc FC_TASK_NAME_DEFAULT_ARG);
void _wait( const microseconds& timeout_us ); void _wait( const microseconds& timeout_us );
void _wait_until( const time_point& timeout_us ); void _wait_until( const time_point& timeout_us );
void _enqueue_thread(); void _enqueue_thread();
@ -80,7 +87,6 @@ namespace fc {
void _set_value(const void* v); void _set_value(const void* v);
void _on_complete( detail::completion_handler* c ); void _on_complete( detail::completion_handler* c );
~promise_base();
private: private:
friend class thread; friend class thread;
@ -101,15 +107,28 @@ namespace fc {
#endif #endif
const char* _desc; const char* _desc;
detail::completion_handler* _compl; detail::completion_handler* _compl;
std::shared_ptr<promise_base> _self;
boost::atomic<int32_t> _retain_count;
}; };
template<typename T = void> template<typename T = void>
class promise : virtual public promise_base { class promise : virtual public promise_base {
public: public:
typedef fc::shared_ptr< promise<T> > ptr; typedef std::shared_ptr< promise<T> > ptr;
promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} virtual ~promise(){}
promise( const T& val ){ set_value(val); }
promise( T&& val ){ set_value(std::move(val) ); } static ptr create( const char* desc FC_TASK_NAME_DEFAULT_ARG )
{
return ptr( new promise<T>( desc ) );
}
static ptr create( const T& val )
{
return ptr( new promise<T>( val ) );
}
static ptr create( T&& val )
{
return ptr( new promise<T>( std::move(val) ) );
}
const T& wait(const microseconds& timeout = microseconds::maximum() ){ const T& wait(const microseconds& timeout = microseconds::maximum() ){
this->_wait( timeout ); this->_wait( timeout );
@ -135,17 +154,27 @@ namespace fc {
_on_complete( new detail::completion_handler_impl<CompletionHandler,T>(fc::forward<CompletionHandler>(c)) ); _on_complete( new detail::completion_handler_impl<CompletionHandler,T>(fc::forward<CompletionHandler>(c)) );
} }
protected: protected:
promise( const char* desc ):promise_base(desc){}
promise( const T& val ){ set_value(val); }
promise( T&& val ){ set_value(std::move(val) ); }
optional<T> result; optional<T> result;
~promise(){}
}; };
template<> template<>
class promise<void> : virtual public promise_base { class promise<void> : virtual public promise_base {
public: public:
typedef fc::shared_ptr< promise<void> > ptr; typedef std::shared_ptr< promise<void> > ptr;
promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){}
promise( bool fulfilled, const char* desc FC_TASK_NAME_DEFAULT_ARG ){ virtual ~promise(){}
if( fulfilled ) set_value();
static ptr create( const char* desc FC_TASK_NAME_DEFAULT_ARG )
{
return ptr( new promise<void>( desc ) );
}
static ptr create( bool fulfilled, const char* desc FC_TASK_NAME_DEFAULT_ARG )
{
return ptr( new promise<void>( fulfilled, desc ) );
} }
void wait(const microseconds& timeout = microseconds::maximum() ){ void wait(const microseconds& timeout = microseconds::maximum() ){
@ -163,7 +192,10 @@ namespace fc {
_on_complete( new detail::completion_handler_impl<CompletionHandler,void>(fc::forward<CompletionHandler>(c)) ); _on_complete( new detail::completion_handler_impl<CompletionHandler,void>(fc::forward<CompletionHandler>(c)) );
} }
protected: protected:
~promise(){} promise( const char* desc ):promise_base(desc){}
promise( bool fulfilled, const char* desc ){
if( fulfilled ) set_value();
}
}; };
/** /**
@ -184,8 +216,8 @@ namespace fc {
template<typename T> template<typename T>
class future { class future {
public: public:
future( const fc::shared_ptr<promise<T>>& p ):m_prom(p){} future( const typename promise<T>::ptr& p ):m_prom(p){}
future( fc::shared_ptr<promise<T>>&& p ):m_prom(std::move(p)){} future( typename promise<T>::ptr&& p ):m_prom(std::move(p)){}
future(const future<T>& f ) : m_prom(f.m_prom){} future(const future<T>& f ) : m_prom(f.m_prom){}
future(){} future(){}
@ -194,7 +226,6 @@ namespace fc {
return *this; return *this;
} }
operator const T&()const { return wait(); } operator const T&()const { return wait(); }
/// @pre valid() /// @pre valid()
@ -251,14 +282,14 @@ namespace fc {
} }
private: private:
friend class thread; friend class thread;
fc::shared_ptr<promise<T>> m_prom; typename promise<T>::ptr m_prom;
}; };
template<> template<>
class future<void> { class future<void> {
public: public:
future( const fc::shared_ptr<promise<void>>& p ):m_prom(p){} future( const typename promise<void>::ptr& p ):m_prom(p){}
future( fc::shared_ptr<promise<void>>&& p ):m_prom(std::move(p)){} future( typename promise<void>::ptr&& p ):m_prom(std::move(p)){}
future(const future<void>& f ) : m_prom(f.m_prom){} future(const future<void>& f ) : m_prom(f.m_prom){}
future(){} future(){}
@ -313,7 +344,7 @@ namespace fc {
private: private:
friend class thread; friend class thread;
fc::shared_ptr<promise<void>> m_prom; typename promise<void>::ptr m_prom;
}; };
} }

View file

@ -55,7 +55,7 @@ namespace fc {
~ticket_guard(); ~ticket_guard();
void wait_for_my_turn(); void wait_for_my_turn();
private: private:
promise<void>* my_promise; promise<void>::ptr my_promise;
future<void>* ticket; future<void>* ticket;
}; };
@ -97,10 +97,10 @@ namespace fc {
auto do_parallel( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG ) -> fc::future<decltype(f())> { auto do_parallel( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG ) -> fc::future<decltype(f())> {
typedef decltype(f()) Result; typedef decltype(f()) Result;
typedef typename fc::deduce<Functor>::type FunctorType; typedef typename fc::deduce<Functor>::type FunctorType;
fc::task<Result,sizeof(FunctorType)>* tsk = typename task<Result,sizeof(FunctorType)>::ptr tsk =
new fc::task<Result,sizeof(FunctorType)>( fc::forward<Functor>(f), desc ); task<Result,sizeof(FunctorType)>::create( fc::forward<Functor>(f), desc );
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) ); fc::future<Result> r( std::dynamic_pointer_cast< promise<Result> >(tsk) );
detail::get_worker_pool().post( tsk ); detail::get_worker_pool().post( tsk.get() );
return r; return r;
} }
} }

View file

@ -31,9 +31,9 @@ namespace fc {
public: public:
void run(); void run();
virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override; virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override;
~task_base();
protected: protected:
~task_base();
/// Task priority looks like unsupported feature. /// Task priority looks like unsupported feature.
uint64_t _posted_num; uint64_t _posted_num;
priority _prio; priority _prio;
@ -90,6 +90,19 @@ namespace fc {
template<typename R,uint64_t FunctorSize=64> template<typename R,uint64_t FunctorSize=64>
class task : virtual public task_base, virtual public promise<R> { class task : virtual public task_base, virtual public promise<R> {
public: public:
typedef std::shared_ptr<task<R,FunctorSize>> ptr;
virtual ~task(){}
template<typename Functor>
static ptr create( Functor&& f, const char* desc )
{
return ptr( new task<R,FunctorSize>( std::move(f), desc ) );
}
virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); }
alignas(double) char _functor[FunctorSize];
private:
template<typename Functor> template<typename Functor>
task( Functor&& f, const char* desc ):promise_base(desc), task_base(&_functor), promise<R>(desc) { task( Functor&& f, const char* desc ):promise_base(desc), task_base(&_functor), promise<R>(desc) {
typedef typename fc::deduce<Functor>::type FunctorType; typedef typename fc::deduce<Functor>::type FunctorType;
@ -100,16 +113,24 @@ namespace fc {
_promise_impl = static_cast<promise<R>*>(this); _promise_impl = static_cast<promise<R>*>(this);
_run_functor = &detail::functor_run<FunctorType>::run; _run_functor = &detail::functor_run<FunctorType>::run;
} }
};
template<uint64_t FunctorSize>
class task<void,FunctorSize> : public task_base, public promise<void> {
public:
typedef std::shared_ptr<task<void,FunctorSize>> ptr;
virtual ~task(){}
template<typename Functor>
static ptr create( Functor&& f, const char* desc )
{
return ptr( new task<void,FunctorSize>( std::move(f), desc ) );
}
virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); } virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); }
alignas(double) char _functor[FunctorSize]; alignas(double) char _functor[FunctorSize];
private: private:
~task(){}
};
template<uint64_t FunctorSize>
class task<void,FunctorSize> : virtual public task_base, virtual public promise<void> {
public:
template<typename Functor> template<typename Functor>
task( Functor&& f, const char* desc ):promise_base(desc), task_base(&_functor), promise<void>(desc) { task( Functor&& f, const char* desc ):promise_base(desc), task_base(&_functor), promise<void>(desc) {
typedef typename fc::deduce<Functor>::type FunctorType; typedef typename fc::deduce<Functor>::type FunctorType;
@ -120,11 +141,6 @@ namespace fc {
_promise_impl = static_cast<promise<void>*>(this); _promise_impl = static_cast<promise<void>*>(this);
_run_functor = &detail::void_functor_run<FunctorType>::run; _run_functor = &detail::void_functor_run<FunctorType>::run;
} }
virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); }
alignas(double) char _functor[FunctorSize];
private:
~task(){}
}; };
} }

View file

@ -87,10 +87,10 @@ namespace fc {
auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> { auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
typedef decltype(f()) Result; typedef decltype(f()) Result;
typedef typename fc::deduce<Functor>::type FunctorType; typedef typename fc::deduce<Functor>::type FunctorType;
fc::task<Result,sizeof(FunctorType)>* tsk = typename task<Result,sizeof(FunctorType)>::ptr tsk =
new fc::task<Result,sizeof(FunctorType)>( fc::forward<Functor>(f), desc ); task<Result,sizeof(FunctorType)>::create( fc::forward<Functor>(f), desc );
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) ); fc::future<Result> r( std::dynamic_pointer_cast< promise<Result> >(tsk) );
async_task(tsk,prio); async_task(tsk.get(),prio);
return r; return r;
} }
void poke(); void poke();
@ -109,10 +109,10 @@ namespace fc {
auto schedule( Functor&& f, const fc::time_point& when, auto schedule( Functor&& f, const fc::time_point& when,
const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> { const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
typedef decltype(f()) Result; typedef decltype(f()) Result;
fc::task<Result,sizeof(Functor)>* tsk = typename task<Result,sizeof(Functor)>::ptr tsk =
new fc::task<Result,sizeof(Functor)>( fc::forward<Functor>(f), desc ); task<Result,sizeof(Functor)>::create( fc::forward<Functor>(f), desc );
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) ); fc::future<Result> r( std::dynamic_pointer_cast< promise<Result> >(tsk) );
async_task(tsk,prio,when); async_task(tsk.get(),prio,when);
return r; return r;
} }
@ -147,8 +147,8 @@ namespace fc {
template<typename T1, typename T2> template<typename T1, typename T2>
int wait_any( const fc::future<T1>& f1, const fc::future<T2>& f2, const microseconds& timeout_us = microseconds::maximum()) { int wait_any( const fc::future<T1>& f1, const fc::future<T2>& f2, const microseconds& timeout_us = microseconds::maximum()) {
std::vector<fc::promise_base::ptr> proms(2); std::vector<fc::promise_base::ptr> proms(2);
proms[0] = fc::static_pointer_cast<fc::promise_base>(f1.m_prom); proms[0] = std::static_pointer_cast<fc::promise_base>(f1.m_prom);
proms[1] = fc::static_pointer_cast<fc::promise_base>(f2.m_prom); proms[1] = std::static_pointer_cast<fc::promise_base>(f2.m_prom);
return wait_any_until(std::move(proms), fc::time_point::now()+timeout_us ); return wait_any_until(std::move(proms), fc::time_point::now()+timeout_us );
} }
private: private:

View file

@ -190,7 +190,7 @@ namespace fc {
try try
{ {
resolver res( fc::asio::default_io_service() ); resolver res( fc::asio::default_io_service() );
promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") ); promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p = promise<std::vector<boost::asio::ip::tcp::endpoint> >::create("tcp::resolve completion");
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) ); boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait(); return p->wait();
@ -204,7 +204,7 @@ namespace fc {
try try
{ {
resolver res( fc::asio::default_io_service() ); resolver res( fc::asio::default_io_service() );
promise<std::vector<endpoint> >::ptr p( new promise<std::vector<endpoint> >("udp::resolve completion") ); promise<std::vector<endpoint> >::ptr p = promise<std::vector<endpoint> >::create("udp::resolve completion");
res.async_resolve( resolver::query(hostname,port), res.async_resolve( resolver::query(hostname,port),
boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) ); boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait(); return p->wait();

View file

@ -24,7 +24,7 @@ namespace fc {
std::cin.read(&c,1); std::cin.read(&c,1);
while( !std::cin.eof() ) { while( !std::cin.eof() ) {
while( write_pos - read_pos > 0xfffff ) { while( write_pos - read_pos > 0xfffff ) {
fc::promise<void>::ptr wr( new fc::promise<void>("cin_buffer::write_ready") ); fc::promise<void>::ptr wr = fc::promise<void>::create("cin_buffer::write_ready");
write_ready = wr; write_ready = wr;
if( write_pos - read_pos <= 0xfffff ) { if( write_pos - read_pos <= 0xfffff ) {
wr->wait(); wr->wait();
@ -138,7 +138,7 @@ namespace fc {
do { do {
while( !b.eof && (b.write_pos - b.read_pos)==0 ){ while( !b.eof && (b.write_pos - b.read_pos)==0 ){
// wait for more... // wait for more...
fc::promise<void>::ptr rr( new fc::promise<void>("cin_buffer::read_ready") ); fc::promise<void>::ptr rr = fc::promise<void>::create("cin_buffer::read_ready");
{ // copy read_ready because it is accessed from multiple threads { // copy read_ready because it is accessed from multiple threads
fc::scoped_lock<boost::mutex> lock( b.read_ready_mutex ); fc::scoped_lock<boost::mutex> lock( b.read_ready_mutex );
b.read_ready = rr; b.read_ready = rr;

View file

@ -292,8 +292,8 @@ namespace fc { namespace http {
if( _server.is_listening() ) if( _server.is_listening() )
_server.stop_listening(); _server.stop_listening();
if ( _connections.size() ) if( _connections.size() )
_closed = new fc::promise<void>(); _closed = promise<void>::create();
auto cpy_con = _connections; auto cpy_con = _connections;
for( auto item : cpy_con ) for( auto item : cpy_con )
@ -642,13 +642,13 @@ namespace fc { namespace http {
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
my->_uri = uri; my->_uri = uri;
my->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") ); my->_connected = promise<void>::create("websocket::connect");
my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
my->_hdl = hdl; my->_hdl = hdl;
auto con = my->_client.get_con_from_hdl(hdl); auto con = my->_client.get_con_from_hdl(hdl);
my->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_client_connection_type>>( con ); my->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_client_connection_type>>( con );
my->_closed = fc::promise<void>::ptr( new fc::promise<void>("websocket::closed") ); my->_closed = promise<void>::create("websocket::closed");
my->_connected->set_value(); my->_connected->set_value();
}); });
@ -670,12 +670,12 @@ namespace fc { namespace http {
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
smy->_uri = uri; smy->_uri = uri;
smy->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") ); smy->_connected = promise<void>::create("websocket::connect");
smy->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ smy->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
auto con = smy->_client.get_con_from_hdl(hdl); auto con = smy->_client.get_con_from_hdl(hdl);
smy->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_tls_client_connection_type>>( con ); smy->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_tls_client_connection_type>>( con );
smy->_closed = fc::promise<void>::ptr( new fc::promise<void>("websocket::closed") ); smy->_closed = promise<void>::create("websocket::closed");
smy->_connected->set_value(); smy->_connected->set_value();
}); });
@ -705,12 +705,12 @@ namespace fc { namespace http {
// wlog( "connecting to ${uri}", ("uri",uri)); // wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
my->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") ); my->_connected = promise<void>::create("websocket::connect");
my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
auto con = my->_client.get_con_from_hdl(hdl); auto con = my->_client.get_con_from_hdl(hdl);
my->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_tls_client_connection_type>>( con ); my->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_tls_client_connection_type>>( con );
my->_closed = fc::promise<void>::ptr( new fc::promise<void>("websocket::closed") ); my->_closed = promise<void>::create("websocket::closed");
my->_connected->set_value(); my->_connected->set_value();
}); });

View file

@ -284,7 +284,7 @@ namespace fc
size_t bytes_read; size_t bytes_read;
if (_download_bytes_per_second) if (_download_bytes_per_second)
{ {
promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::readsome")); promise<size_t>::ptr completion_promise = promise<size_t>::create("rate_limiting_group_impl::readsome");
rate_limited_tcp_read_operation read_operation(socket, buffer, length, offset, completion_promise); rate_limited_tcp_read_operation read_operation(socket, buffer, length, offset, completion_promise);
_read_operations_for_next_iteration.push_back(&read_operation); _read_operations_for_next_iteration.push_back(&read_operation);
@ -330,7 +330,7 @@ namespace fc
size_t bytes_written; size_t bytes_written;
if (_upload_bytes_per_second) if (_upload_bytes_per_second)
{ {
promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::writesome")); promise<size_t>::ptr completion_promise = promise<size_t>::create("rate_limiting_group_impl::writesome");
rate_limited_tcp_write_operation write_operation(socket, buffer, length, offset, completion_promise); rate_limited_tcp_write_operation write_operation(socket, buffer, length, offset, completion_promise);
_write_operations_for_next_iteration.push_back(&write_operation); _write_operations_for_next_iteration.push_back(&write_operation);
@ -367,7 +367,7 @@ namespace fc
process_pending_operations(_last_read_iteration_time, _download_bytes_per_second, process_pending_operations(_last_read_iteration_time, _download_bytes_per_second,
_read_operations_in_progress, _read_operations_for_next_iteration, _read_tokens, _unused_read_tokens); _read_operations_in_progress, _read_operations_for_next_iteration, _read_tokens, _unused_read_tokens);
_new_read_operation_available_promise = new promise<void>("rate_limiting_group_impl::process_pending_reads"); _new_read_operation_available_promise = promise<void>::create("rate_limiting_group_impl::process_pending_reads");
try try
{ {
if (_read_operations_in_progress.empty()) if (_read_operations_in_progress.empty())
@ -388,7 +388,7 @@ namespace fc
process_pending_operations(_last_write_iteration_time, _upload_bytes_per_second, process_pending_operations(_last_write_iteration_time, _upload_bytes_per_second,
_write_operations_in_progress, _write_operations_for_next_iteration, _write_tokens, _unused_write_tokens); _write_operations_in_progress, _write_operations_for_next_iteration, _write_tokens, _unused_write_tokens);
_new_write_operation_available_promise = new promise<void>("rate_limiting_group_impl::process_pending_writes"); _new_write_operation_available_promise = promise<void>::create("rate_limiting_group_impl::process_pending_writes");
try try
{ {
if (_write_operations_in_progress.empty()) if (_write_operations_in_progress.empty())

View file

@ -56,7 +56,7 @@ namespace fc {
throw; throw;
} }
promise<size_t>::ptr completion_promise(new promise<size_t>("udp_socket::send_to")); promise<size_t>::ptr completion_promise = promise<size_t>::create("udp_socket::send_to");
my->_sock.async_send_to( boost::asio::buffer(buffer, length), to_asio_ep(to), my->_sock.async_send_to( boost::asio::buffer(buffer, length), to_asio_ep(to),
asio::detail::read_write_handler(completion_promise) ); asio::detail::read_write_handler(completion_promise) );
@ -76,7 +76,7 @@ namespace fc {
throw; throw;
} }
promise<size_t>::ptr completion_promise(new promise<size_t>("udp_socket::send_to")); promise<size_t>::ptr completion_promise = promise<size_t>::create("udp_socket::send_to");
my->_sock.async_send_to( boost::asio::buffer(buffer.get(), length), to_asio_ep(to), my->_sock.async_send_to( boost::asio::buffer(buffer.get(), length), to_asio_ep(to),
asio::detail::read_write_handler_with_buffer(completion_promise, buffer) ); asio::detail::read_write_handler_with_buffer(completion_promise, buffer) );
@ -111,7 +111,7 @@ namespace fc {
} }
boost::asio::ip::udp::endpoint boost_from_endpoint; boost::asio::ip::udp::endpoint boost_from_endpoint;
promise<size_t>::ptr completion_promise(new promise<size_t>("udp_socket::receive_from")); promise<size_t>::ptr completion_promise = promise<size_t>::create("udp_socket::receive_from");
my->_sock.async_receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length), my->_sock.async_receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length),
boost_from_endpoint, boost_from_endpoint,
asio::detail::read_write_handler_with_buffer(completion_promise, receive_buffer) ); asio::detail::read_write_handler_with_buffer(completion_promise, receive_buffer) );
@ -137,7 +137,7 @@ namespace fc {
} }
boost::asio::ip::udp::endpoint boost_from_endpoint; boost::asio::ip::udp::endpoint boost_from_endpoint;
promise<size_t>::ptr completion_promise(new promise<size_t>("udp_socket::receive_from")); promise<size_t>::ptr completion_promise = promise<size_t>::create("udp_socket::receive_from");
my->_sock.async_receive_from( boost::asio::buffer(receive_buffer, receive_buffer_length), boost_from_endpoint, my->_sock.async_receive_from( boost::asio::buffer(receive_buffer, receive_buffer_length), boost_from_endpoint,
asio::detail::read_write_handler(completion_promise) ); asio::detail::read_write_handler(completion_promise) );
size_t bytes_read = completion_promise->wait(); size_t bytes_read = completion_promise->wait();

View file

@ -46,7 +46,7 @@ void state::handle_reply( const response& response )
request state::start_remote_call( const string& method_name, variants args ) request state::start_remote_call( const string& method_name, variants args )
{ {
request request{ _next_id++, method_name, std::move(args) }; request request{ _next_id++, method_name, std::move(args) };
_awaiting[*request.id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") ); _awaiting[*request.id] = fc::promise<variant>::create("json_connection::async_call");
return request; return request;
} }
variant state::wait_for_response( const variant& request_id ) variant state::wait_for_response( const variant& request_id )

View file

@ -1,30 +0,0 @@
#include <fc/shared_ptr.hpp>
#include <boost/atomic.hpp>
#include <boost/memory_order.hpp>
#include <assert.h>
namespace fc {
retainable::retainable()
:_ref_count(1) {
static_assert( sizeof(_ref_count) == sizeof(boost::atomic<int32_t>), "failed to reserve enough space" );
}
retainable::~retainable() {
assert( _ref_count <= 0 );
assert( _ref_count == 0 );
}
void retainable::retain() {
((boost::atomic<int32_t>*)&_ref_count)->fetch_add(1, boost::memory_order_relaxed );
}
void retainable::release() {
boost::atomic_thread_fence(boost::memory_order_acquire);
if( 1 == ((boost::atomic<int32_t>*)&_ref_count)->fetch_sub(1, boost::memory_order_release ) ) {
delete this;
}
}
int32_t retainable::retain_count()const {
return _ref_count;
}
}

View file

@ -19,7 +19,8 @@ namespace fc {
_cancellation_reason(nullptr), _cancellation_reason(nullptr),
#endif #endif
_desc(desc), _desc(desc),
_compl(nullptr) _compl(nullptr),
_retain_count(0)
{ } { }
const char* promise_base::get_desc()const{ const char* promise_base::get_desc()const{
@ -72,7 +73,7 @@ namespace fc {
// See https://github.com/cryptonomex/graphene/issues/597 // See https://github.com/cryptonomex/graphene/issues/597
// //
ptr p_this = ptr( this, true ); ptr p_this = shared_from_this();
try try
{ {
@ -123,7 +124,7 @@ namespace fc {
blocked_thread = _blocked_thread; blocked_thread = _blocked_thread;
} }
if( blocked_thread ) if( blocked_thread )
blocked_thread->notify(ptr(this,true)); blocked_thread->notify( shared_from_this() );
} }
promise_base::~promise_base() { } promise_base::~promise_base() { }
void promise_base::_set_timeout(){ void promise_base::_set_timeout(){
@ -150,5 +151,13 @@ namespace fc {
_compl = c; _compl = c;
} }
} }
void promise_base::retain() {
if( _retain_count.fetch_add(1, boost::memory_order_relaxed) == 0 )
_self = shared_from_this();
}
void promise_base::release() {
if( _retain_count.fetch_sub(1, boost::memory_order_release) == 1 )
_self.reset();
}
} }

View file

@ -158,8 +158,8 @@ namespace fc {
serial_valve::ticket_guard::ticket_guard( boost::atomic<future<void>*>& latch ) serial_valve::ticket_guard::ticket_guard( boost::atomic<future<void>*>& latch )
{ {
my_promise = new promise<void>(); my_promise = promise<void>::create();
future<void>* my_future = new future<void>( promise<void>::ptr( my_promise, true ) ); future<void>* my_future = new future<void>( my_promise );
try try
{ {
do do
@ -171,7 +171,7 @@ namespace fc {
} }
catch (...) catch (...)
{ {
delete my_future; // this takes care of my_promise as well delete my_future;
throw; throw;
} }
} }
@ -190,7 +190,7 @@ namespace fc {
serial_valve::serial_valve() serial_valve::serial_valve()
{ {
latch.store( new future<void>( promise<void>::ptr( new promise<void>( true ), true ) ) ); latch.store( new future<void>( promise<void>::create( true ) ) );
} }
serial_valve::~serial_valve() serial_valve::~serial_valve()

View file

@ -72,7 +72,7 @@ namespace fc {
} }
thread::thread( const std::string& name, thread_idle_notifier* notifier ) { thread::thread( const std::string& name, thread_idle_notifier* notifier ) {
promise<void>::ptr p(new promise<void>("thread start")); promise<void>::ptr p = promise<void>::create("thread start");
boost::thread* t = new boost::thread( [this,p,name,notifier]() { boost::thread* t = new boost::thread( [this,p,name,notifier]() {
try { try {
set_thread_name(name.c_str()); // set thread's name for the debugger to display set_thread_name(name.c_str()); // set thread's name for the debugger to display

View file

@ -200,25 +200,25 @@ BOOST_AUTO_TEST_CASE( serial_valve )
fc::serial_valve valve; fc::serial_valve valve;
{ // Simple test, f2 finishes before f1 { // Simple test, f2 finishes before f1
fc::promise<void>* syncer = new fc::promise<void>(); fc::promise<void>::ptr syncer = fc::promise<void>::create();
fc::promise<void>* waiter = new fc::promise<void>(); fc::promise<void>::ptr waiter = fc::promise<void>::create();
auto p1 = fc::async([&counter,&valve,syncer,waiter] () { auto p1 = fc::async([&counter,&valve,syncer,waiter] () {
valve.do_serial( [syncer,waiter](){ syncer->set_value(); valve.do_serial( [syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); }, fc::future<void>( waiter ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() ); [&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() );
counter.fetch_add(1); } ); counter.fetch_add(1); } );
}); });
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait(); fc::future<void>( syncer ).wait();
// at this point, p1.f1 has started executing and is waiting on waiter // at this point, p1.f1 has started executing and is waiting on waiter
syncer = new fc::promise<void>(); syncer = fc::promise<void>::create();
auto p2 = fc::async([&counter,&valve,syncer] () { auto p2 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); }, valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() ); [&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() );
counter.fetch_add(1); } ); counter.fetch_add(1); } );
}); });
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait(); fc::future<void>( syncer ).wait();
fc::usleep( fc::milliseconds(10) ); fc::usleep( fc::milliseconds(10) );
@ -237,37 +237,37 @@ BOOST_AUTO_TEST_CASE( serial_valve )
} }
{ // Triple test, f3 finishes first, then f1, finally f2 { // Triple test, f3 finishes first, then f1, finally f2
fc::promise<void>* syncer = new fc::promise<void>(); fc::promise<void>::ptr syncer = fc::promise<void>::create();
fc::promise<void>* waiter = new fc::promise<void>(); fc::promise<void>::ptr waiter = fc::promise<void>::create();
counter.store(0); counter.store(0);
auto p1 = fc::async([&counter,&valve,syncer,waiter] () { auto p1 = fc::async([&counter,&valve,syncer,waiter] () {
valve.do_serial( [&syncer,waiter](){ syncer->set_value(); valve.do_serial( [&syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); }, fc::future<void>( waiter ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() ); [&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() );
counter.fetch_add(1); } ); counter.fetch_add(1); } );
}); });
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait(); fc::future<void>( syncer ).wait();
// at this point, p1.f1 has started executing and is waiting on waiter // at this point, p1.f1 has started executing and is waiting on waiter
syncer = new fc::promise<void>(); syncer = fc::promise<void>::create();
auto p2 = fc::async([&counter,&valve,syncer] () { auto p2 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [&syncer](){ syncer->set_value(); valve.do_serial( [&syncer](){ syncer->set_value();
fc::usleep( fc::milliseconds(100) ); }, fc::usleep( fc::milliseconds(100) ); },
[&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() ); [&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() );
counter.fetch_add(1); } ); counter.fetch_add(1); } );
}); });
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait(); fc::future<void>( syncer ).wait();
// at this point, p2.f1 has started executing and is sleeping // at this point, p2.f1 has started executing and is sleeping
syncer = new fc::promise<void>(); syncer = fc::promise<void>::create();
auto p3 = fc::async([&counter,&valve,syncer] () { auto p3 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); }, valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 2u, counter.load() ); [&counter](){ BOOST_CHECK_EQUAL( 2u, counter.load() );
counter.fetch_add(1); } ); counter.fetch_add(1); } );
}); });
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait(); fc::future<void>( syncer ).wait();
fc::usleep( fc::milliseconds(10) ); fc::usleep( fc::milliseconds(10) );
@ -288,37 +288,37 @@ BOOST_AUTO_TEST_CASE( serial_valve )
} }
{ // Triple test again but with invocations from different threads { // Triple test again but with invocations from different threads
fc::promise<void>* syncer = new fc::promise<void>(); fc::promise<void>::ptr syncer = fc::promise<void>::create();
fc::promise<void>* waiter = new fc::promise<void>(); fc::promise<void>::ptr waiter = fc::promise<void>::create();
counter.store(0); counter.store(0);
auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () { auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () {
valve.do_serial( [&syncer,waiter](){ syncer->set_value(); valve.do_serial( [&syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); }, fc::future<void>( waiter ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() ); [&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() );
counter.fetch_add(1); } ); counter.fetch_add(1); } );
}); });
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait(); fc::future<void>( syncer ).wait();
// at this point, p1.f1 has started executing and is waiting on waiter // at this point, p1.f1 has started executing and is waiting on waiter
syncer = new fc::promise<void>(); syncer = fc::promise<void>::create();
auto p2 = fc::do_parallel([&counter,&valve,syncer] () { auto p2 = fc::do_parallel([&counter,&valve,syncer] () {
valve.do_serial( [&syncer](){ syncer->set_value(); valve.do_serial( [&syncer](){ syncer->set_value();
fc::usleep( fc::milliseconds(100) ); }, fc::usleep( fc::milliseconds(100) ); },
[&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() ); [&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() );
counter.fetch_add(1); } ); counter.fetch_add(1); } );
}); });
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait(); fc::future<void>( syncer ).wait();
// at this point, p2.f1 has started executing and is sleeping // at this point, p2.f1 has started executing and is sleeping
syncer = new fc::promise<void>(); syncer = fc::promise<void>::create();
auto p3 = fc::do_parallel([&counter,&valve,syncer] () { auto p3 = fc::do_parallel([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); }, valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 2u, counter.load() ); [&counter](){ BOOST_CHECK_EQUAL( 2u, counter.load() );
counter.fetch_add(1); } ); counter.fetch_add(1); } );
}); });
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait(); fc::future<void>( syncer ).wait();
fc::usleep( fc::milliseconds(10) ); fc::usleep( fc::milliseconds(10) );

View file

@ -136,7 +136,7 @@ BOOST_AUTO_TEST_CASE( cancel_a_task_waiting_on_promise )
{ {
enum task_result{task_completed, task_aborted}; enum task_result{task_completed, task_aborted};
fc::promise<void>::ptr promise_to_wait_on(new fc::promise<void>()); fc::promise<void>::ptr promise_to_wait_on = fc::promise<void>::create();
fc::future<task_result> task = fc::async([promise_to_wait_on]() { fc::future<task_result> task = fc::async([promise_to_wait_on]() {
BOOST_TEST_MESSAGE("Starting async task"); BOOST_TEST_MESSAGE("Starting async task");