From 7cf371736b3f06b769cfc0509f2f66afdbfaceaa Mon Sep 17 00:00:00 2001 From: Peter Conrad Date: Fri, 26 Apr 2019 14:21:11 +0200 Subject: [PATCH] Intermediate --- CMakeLists.txt | 1 - include/fc/asio.hpp | 18 +++---- include/fc/shared_ptr.hpp | 92 --------------------------------- include/fc/signals.hpp | 4 +- include/fc/thread/future.hpp | 83 +++++++++++++++++++---------- include/fc/thread/parallel.hpp | 12 ++--- include/fc/thread/task.hpp | 40 +++++++++----- include/fc/thread/thread.hpp | 20 +++---- src/asio.cpp | 4 +- src/io/iostream.cpp | 4 +- src/network/http/websocket.cpp | 16 +++--- src/network/rate_limiting.cpp | 8 +-- src/network/udp_socket.cpp | 8 +-- src/rpc/state.cpp | 2 +- src/shared_ptr.cpp | 30 ----------- src/thread/future.cpp | 15 ++++-- src/thread/parallel.cpp | 8 +-- src/thread/thread.cpp | 2 +- tests/thread/parallel_tests.cpp | 44 ++++++++-------- tests/thread/task_cancel.cpp | 2 +- 20 files changed, 173 insertions(+), 240 deletions(-) delete mode 100644 include/fc/shared_ptr.hpp delete mode 100644 src/shared_ptr.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f8fea3d..8fea7bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -211,7 +211,6 @@ set( fc_sources src/thread/non_preemptable_scope_check.cpp src/asio.cpp src/string.cpp - src/shared_ptr.cpp src/stacktrace.cpp src/time.cpp src/utf8.cpp diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index 06ba2f8..4c330c4 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -122,7 +122,7 @@ namespace asio { template future read_some(AsyncReadStream& s, const MutableBufferSequence& buf) { - promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + promise::ptr completion_promise = promise::create("fc::asio::async_read_some"); s.async_read_some(buf, detail::read_write_handler(completion_promise)); return completion_promise;//->wait(); } @@ -130,7 +130,7 @@ namespace asio { template future read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0) { - promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + promise::ptr completion_promise = promise::create("fc::asio::async_read_some"); s.async_read_some(boost::asio::buffer(buffer + offset, length), detail::read_write_handler(completion_promise)); return completion_promise;//->wait(); @@ -139,7 +139,7 @@ namespace asio { template future read_some(AsyncReadStream& s, const std::shared_ptr& buffer, size_t length, size_t offset) { - promise::ptr completion_promise(new promise("fc::asio::async_read_some")); + promise::ptr completion_promise = promise::create("fc::asio::async_read_some"); s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer)); return completion_promise;//->wait(); @@ -179,7 +179,7 @@ namespace asio { */ template size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) { - promise::ptr p(new promise("fc::asio::write")); + promise::ptr p = promise::create("fc::asio::write"); boost::asio::async_write(s, buf, detail::read_write_handler(p)); return p->wait(); } @@ -191,7 +191,7 @@ namespace asio { */ template future write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { - promise::ptr p(new promise("fc::asio::write_some")); + promise::ptr p = promise::create("fc::asio::write_some"); s.async_write_some( buf, detail::read_write_handler(p)); return p; //->wait(); } @@ -199,7 +199,7 @@ namespace asio { template future write_some( AsyncWriteStream& s, const char* buffer, size_t length, size_t offset = 0) { - promise::ptr p(new promise("fc::asio::write_some")); + promise::ptr p = promise::create("fc::asio::write_some"); s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p)); return p; //->wait(); } @@ -207,7 +207,7 @@ namespace asio { template future write_some( AsyncWriteStream& s, const std::shared_ptr& buffer, size_t length, size_t offset ) { - promise::ptr p(new promise("fc::asio::write_some")); + promise::ptr p = promise::create("fc::asio::write_some"); s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer)); return p; //->wait(); } @@ -250,7 +250,7 @@ namespace asio { template void accept( AcceptorType& acc, SocketType& sock ) { //promise::ptr p( new promise("fc::asio::tcp::accept") ); - promise::ptr p( new promise("fc::asio::tcp::accept") ); + promise::ptr p = promise::create("fc::asio::tcp::accept"); acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); p->wait(); //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); @@ -262,7 +262,7 @@ namespace asio { */ template void connect( AsyncSocket& sock, const EndpointType& ep ) { - promise::ptr p(new promise("fc::asio::tcp::connect")); + promise::ptr p = promise::create("fc::asio::tcp::connect"); sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) ); p->wait(); //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); diff --git a/include/fc/shared_ptr.hpp b/include/fc/shared_ptr.hpp deleted file mode 100644 index a07ec46..0000000 --- a/include/fc/shared_ptr.hpp +++ /dev/null @@ -1,92 +0,0 @@ -#pragma once - -#include -#include - -namespace fc { - - /** - * @brief used to create reference counted types. - * - * Must be a virtual base class that is initialized with the - * - */ - class retainable { - public: - retainable(); - void retain(); - void release(); - int32_t retain_count()const; - - protected: - virtual ~retainable(); - private: - volatile int32_t _ref_count; - }; - - template - class shared_ptr { - public: - template - shared_ptr( const shared_ptr& o ) - :_ptr(o.get()) { - if(_ptr != nullptr ) _ptr->retain(); - } - shared_ptr( const shared_ptr& o ) - :_ptr(o.get()) { - if(_ptr != nullptr ) _ptr->retain(); - } - - shared_ptr( T* t, bool inc = false ) - :_ptr(t) { if( inc && t != nullptr) t->retain(); } - - shared_ptr():_ptr(nullptr){} - - - shared_ptr( shared_ptr&& p ) - :_ptr(p._ptr){ p._ptr = nullptr; } - - ~shared_ptr() { if( nullptr != _ptr ) { _ptr->release(); _ptr = nullptr; } } - - shared_ptr& reset( T* v = nullptr, bool inc = false ) { - if( v == _ptr ) return *this; - if( inc && nullptr != v ) v->retain(); - if( nullptr != _ptr ) _ptr->release(); - _ptr = v; - return *this; - } - - shared_ptr& operator=(const shared_ptr& p ) { - if( _ptr == p._ptr ) return *this; - if( p._ptr != nullptr ) p._ptr->retain(); - if( _ptr != nullptr ) _ptr->release(); - _ptr = p._ptr; - return *this; - } - shared_ptr& operator=(shared_ptr&& p ) { - std::swap(_ptr,p._ptr); - return *this; - } - T& operator* ()const { return *_ptr; } - T* operator-> ()const { return _ptr; } - - bool operator==( const shared_ptr& p )const { return get() == p.get(); } - bool operator<( const shared_ptr& p )const { return get() < p.get(); } - T * get() const { return _ptr; } - - bool operator!()const { return _ptr == 0; } - operator bool()const { return _ptr != 0; } - private: - T* _ptr; - }; - - template - fc::shared_ptr dynamic_pointer_cast( const fc::shared_ptr& t ) { - return fc::shared_ptr( dynamic_cast(t.get()), true ); - } - template - fc::shared_ptr static_pointer_cast( const fc::shared_ptr& t ) { - return fc::shared_ptr( static_cast(t.get()), true ); - } -} - diff --git a/include/fc/signals.hpp b/include/fc/signals.hpp index dd886a6..5fb4671 100644 --- a/include/fc/signals.hpp +++ b/include/fc/signals.hpp @@ -36,13 +36,13 @@ namespace fc { template inline T wait( boost::signals2::signal& sig, const microseconds& timeout_us=microseconds::maximum() ) { - typename promise::ptr p(new promise("fc::signal::wait")); + typename promise::ptr p = promise::create("fc::signal::wait"); boost::signals2::scoped_connection c( sig.connect( [=]( T t ) { p->set_value(t); } )); return p->wait( timeout_us ); } inline void wait( boost::signals2::signal& sig, const microseconds& timeout_us=microseconds::maximum() ) { - promise::ptr p(new promise("fc::signal::wait")); + promise::ptr p = promise::create("fc::signal::wait"); boost::signals2::scoped_connection c( sig.connect( [=]() { p->set_value(); } )); p->wait( timeout_us ); } diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index f0365ea..a6937c6 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -1,9 +1,11 @@ #pragma once #include -#include #include #include #include +#include + +#include //#define FC_TASK_NAMES_ARE_MANDATORY 1 #ifdef FC_TASK_NAMES_ARE_MANDATORY @@ -56,10 +58,10 @@ namespace fc { }; } - class promise_base : public virtual retainable{ + class promise_base : public std::enable_shared_from_this { public: - typedef fc::shared_ptr ptr; - promise_base(const char* desc FC_TASK_NAME_DEFAULT_ARG); + typedef std::shared_ptr ptr; + virtual ~promise_base(); const char* get_desc()const; @@ -70,7 +72,12 @@ namespace fc { void set_exception( const fc::exception_ptr& e ); + void retain(); + void release(); + protected: + promise_base(const char* desc FC_TASK_NAME_DEFAULT_ARG); + void _wait( const microseconds& timeout_us ); void _wait_until( const time_point& timeout_us ); void _enqueue_thread(); @@ -80,7 +87,6 @@ namespace fc { void _set_value(const void* v); void _on_complete( detail::completion_handler* c ); - ~promise_base(); private: friend class thread; @@ -99,18 +105,31 @@ namespace fc { const char* _cancellation_reason; private: #endif - const char* _desc; - detail::completion_handler* _compl; + const char* _desc; + detail::completion_handler* _compl; + std::shared_ptr _self; + boost::atomic _retain_count; }; template class promise : virtual public promise_base { public: - typedef fc::shared_ptr< promise > ptr; - promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} - promise( const T& val ){ set_value(val); } - promise( T&& val ){ set_value(std::move(val) ); } - + typedef std::shared_ptr< promise > ptr; + virtual ~promise(){} + + static ptr create( const char* desc FC_TASK_NAME_DEFAULT_ARG ) + { + return ptr( new promise( desc ) ); + } + static ptr create( const T& val ) + { + return ptr( new promise( val ) ); + } + static ptr create( T&& val ) + { + return ptr( new promise( std::move(val) ) ); + } + const T& wait(const microseconds& timeout = microseconds::maximum() ){ this->_wait( timeout ); return *result; @@ -135,19 +154,29 @@ namespace fc { _on_complete( new detail::completion_handler_impl(fc::forward(c)) ); } protected: + promise( const char* desc ):promise_base(desc){} + promise( const T& val ){ set_value(val); } + promise( T&& val ){ set_value(std::move(val) ); } + optional result; - ~promise(){} }; template<> class promise : virtual public promise_base { public: - typedef fc::shared_ptr< promise > ptr; - promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} - promise( bool fulfilled, const char* desc FC_TASK_NAME_DEFAULT_ARG ){ - if( fulfilled ) set_value(); - } + typedef std::shared_ptr< promise > ptr; + + virtual ~promise(){} + static ptr create( const char* desc FC_TASK_NAME_DEFAULT_ARG ) + { + return ptr( new promise( desc ) ); + } + static ptr create( bool fulfilled, const char* desc FC_TASK_NAME_DEFAULT_ARG ) + { + return ptr( new promise( fulfilled, desc ) ); + } + void wait(const microseconds& timeout = microseconds::maximum() ){ this->_wait( timeout ); } @@ -163,7 +192,10 @@ namespace fc { _on_complete( new detail::completion_handler_impl(fc::forward(c)) ); } protected: - ~promise(){} + promise( const char* desc ):promise_base(desc){} + promise( bool fulfilled, const char* desc ){ + if( fulfilled ) set_value(); + } }; /** @@ -184,8 +216,8 @@ namespace fc { template class future { public: - future( const fc::shared_ptr>& p ):m_prom(p){} - future( fc::shared_ptr>&& p ):m_prom(std::move(p)){} + future( const typename promise::ptr& p ):m_prom(p){} + future( typename promise::ptr&& p ):m_prom(std::move(p)){} future(const future& f ) : m_prom(f.m_prom){} future(){} @@ -194,7 +226,6 @@ namespace fc { return *this; } - operator const T&()const { return wait(); } /// @pre valid() @@ -251,14 +282,14 @@ namespace fc { } private: friend class thread; - fc::shared_ptr> m_prom; + typename promise::ptr m_prom; }; template<> class future { public: - future( const fc::shared_ptr>& p ):m_prom(p){} - future( fc::shared_ptr>&& p ):m_prom(std::move(p)){} + future( const typename promise::ptr& p ):m_prom(p){} + future( typename promise::ptr&& p ):m_prom(std::move(p)){} future(const future& f ) : m_prom(f.m_prom){} future(){} @@ -313,7 +344,7 @@ namespace fc { private: friend class thread; - fc::shared_ptr> m_prom; + typename promise::ptr m_prom; }; } diff --git a/include/fc/thread/parallel.hpp b/include/fc/thread/parallel.hpp index 59f9cf6..f66e1c6 100644 --- a/include/fc/thread/parallel.hpp +++ b/include/fc/thread/parallel.hpp @@ -55,8 +55,8 @@ namespace fc { ~ticket_guard(); void wait_for_my_turn(); private: - promise* my_promise; - future* ticket; + promise::ptr my_promise; + future* ticket; }; friend class ticket_guard; @@ -97,10 +97,10 @@ namespace fc { auto do_parallel( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG ) -> fc::future { typedef decltype(f()) Result; typedef typename fc::deduce::type FunctorType; - fc::task* tsk = - new fc::task( fc::forward(f), desc ); - fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - detail::get_worker_pool().post( tsk ); + typename task::ptr tsk = + task::create( fc::forward(f), desc ); + fc::future r( std::dynamic_pointer_cast< promise >(tsk) ); + detail::get_worker_pool().post( tsk.get() ); return r; } } diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index 987810b..a3162da 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -31,9 +31,9 @@ namespace fc { public: void run(); virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override; + ~task_base(); protected: - ~task_base(); /// Task priority looks like unsupported feature. uint64_t _posted_num; priority _prio; @@ -90,6 +90,19 @@ namespace fc { template class task : virtual public task_base, virtual public promise { public: + typedef std::shared_ptr> ptr; + + virtual ~task(){} + + template + static ptr create( Functor&& f, const char* desc ) + { + return ptr( new task( std::move(f), desc ) ); + } + virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); } + + alignas(double) char _functor[FunctorSize]; + private: template task( Functor&& f, const char* desc ):promise_base(desc), task_base(&_functor), promise(desc) { typedef typename fc::deduce::type FunctorType; @@ -100,16 +113,24 @@ namespace fc { _promise_impl = static_cast*>(this); _run_functor = &detail::functor_run::run; } - virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); } - - alignas(double) char _functor[FunctorSize]; - private: - ~task(){} }; template - class task : virtual public task_base, virtual public promise { + class task : public task_base, public promise { public: + typedef std::shared_ptr> ptr; + + virtual ~task(){} + + template + static ptr create( Functor&& f, const char* desc ) + { + return ptr( new task( std::move(f), desc ) ); + } + virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); } + + alignas(double) char _functor[FunctorSize]; + private: template task( Functor&& f, const char* desc ):promise_base(desc), task_base(&_functor), promise(desc) { typedef typename fc::deduce::type FunctorType; @@ -120,11 +141,6 @@ namespace fc { _promise_impl = static_cast*>(this); _run_functor = &detail::void_functor_run::run; } - virtual void cancel(const char* reason FC_CANCELATION_REASON_DEFAULT_ARG) override { task_base::cancel(reason); } - - alignas(double) char _functor[FunctorSize]; - private: - ~task(){} }; } diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index b71c066..5c08b71 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -87,10 +87,10 @@ namespace fc { auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { typedef decltype(f()) Result; typedef typename fc::deduce::type FunctorType; - fc::task* tsk = - new fc::task( fc::forward(f), desc ); - fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - async_task(tsk,prio); + typename task::ptr tsk = + task::create( fc::forward(f), desc ); + fc::future r( std::dynamic_pointer_cast< promise >(tsk) ); + async_task(tsk.get(),prio); return r; } void poke(); @@ -109,10 +109,10 @@ namespace fc { auto schedule( Functor&& f, const fc::time_point& when, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { typedef decltype(f()) Result; - fc::task* tsk = - new fc::task( fc::forward(f), desc ); - fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - async_task(tsk,prio,when); + typename task::ptr tsk = + task::create( fc::forward(f), desc ); + fc::future r( std::dynamic_pointer_cast< promise >(tsk) ); + async_task(tsk.get(),prio,when); return r; } @@ -147,8 +147,8 @@ namespace fc { template int wait_any( const fc::future& f1, const fc::future& f2, const microseconds& timeout_us = microseconds::maximum()) { std::vector proms(2); - proms[0] = fc::static_pointer_cast(f1.m_prom); - proms[1] = fc::static_pointer_cast(f2.m_prom); + proms[0] = std::static_pointer_cast(f1.m_prom); + proms[1] = std::static_pointer_cast(f2.m_prom); return wait_any_until(std::move(proms), fc::time_point::now()+timeout_us ); } private: diff --git a/src/asio.cpp b/src/asio.cpp index 35b2990..b56ec49 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -190,7 +190,7 @@ namespace fc { try { resolver res( fc::asio::default_io_service() ); - promise >::ptr p( new promise >("tcp::resolve completion") ); + promise >::ptr p = promise >::create("tcp::resolve completion"); res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), boost::bind( detail::resolve_handler, p, _1, _2 ) ); return p->wait(); @@ -204,7 +204,7 @@ namespace fc { try { resolver res( fc::asio::default_io_service() ); - promise >::ptr p( new promise >("udp::resolve completion") ); + promise >::ptr p = promise >::create("udp::resolve completion"); res.async_resolve( resolver::query(hostname,port), boost::bind( detail::resolve_handler, p, _1, _2 ) ); return p->wait(); diff --git a/src/io/iostream.cpp b/src/io/iostream.cpp index c44fc83..c7d364c 100644 --- a/src/io/iostream.cpp +++ b/src/io/iostream.cpp @@ -24,7 +24,7 @@ namespace fc { std::cin.read(&c,1); while( !std::cin.eof() ) { while( write_pos - read_pos > 0xfffff ) { - fc::promise::ptr wr( new fc::promise("cin_buffer::write_ready") ); + fc::promise::ptr wr = fc::promise::create("cin_buffer::write_ready"); write_ready = wr; if( write_pos - read_pos <= 0xfffff ) { wr->wait(); @@ -138,7 +138,7 @@ namespace fc { do { while( !b.eof && (b.write_pos - b.read_pos)==0 ){ // wait for more... - fc::promise::ptr rr( new fc::promise("cin_buffer::read_ready") ); + fc::promise::ptr rr = fc::promise::create("cin_buffer::read_ready"); { // copy read_ready because it is accessed from multiple threads fc::scoped_lock lock( b.read_ready_mutex ); b.read_ready = rr; diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index b32b863..5138fb0 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -292,8 +292,8 @@ namespace fc { namespace http { if( _server.is_listening() ) _server.stop_listening(); - if ( _connections.size() ) - _closed = new fc::promise(); + if( _connections.size() ) + _closed = promise::create(); auto cpy_con = _connections; for( auto item : cpy_con ) @@ -642,13 +642,13 @@ namespace fc { namespace http { websocketpp::lib::error_code ec; my->_uri = uri; - my->_connected = fc::promise::ptr( new fc::promise("websocket::connect") ); + my->_connected = promise::create("websocket::connect"); my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ my->_hdl = hdl; auto con = my->_client.get_con_from_hdl(hdl); my->_connection = std::make_shared>( con ); - my->_closed = fc::promise::ptr( new fc::promise("websocket::closed") ); + my->_closed = promise::create("websocket::closed"); my->_connected->set_value(); }); @@ -670,12 +670,12 @@ namespace fc { namespace http { websocketpp::lib::error_code ec; smy->_uri = uri; - smy->_connected = fc::promise::ptr( new fc::promise("websocket::connect") ); + smy->_connected = promise::create("websocket::connect"); smy->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ auto con = smy->_client.get_con_from_hdl(hdl); smy->_connection = std::make_shared>( con ); - smy->_closed = fc::promise::ptr( new fc::promise("websocket::closed") ); + smy->_closed = promise::create("websocket::closed"); smy->_connected->set_value(); }); @@ -705,12 +705,12 @@ namespace fc { namespace http { // wlog( "connecting to ${uri}", ("uri",uri)); websocketpp::lib::error_code ec; - my->_connected = fc::promise::ptr( new fc::promise("websocket::connect") ); + my->_connected = promise::create("websocket::connect"); my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){ auto con = my->_client.get_con_from_hdl(hdl); my->_connection = std::make_shared>( con ); - my->_closed = fc::promise::ptr( new fc::promise("websocket::closed") ); + my->_closed = promise::create("websocket::closed"); my->_connected->set_value(); }); diff --git a/src/network/rate_limiting.cpp b/src/network/rate_limiting.cpp index ab7464e..38b8618 100644 --- a/src/network/rate_limiting.cpp +++ b/src/network/rate_limiting.cpp @@ -284,7 +284,7 @@ namespace fc size_t bytes_read; if (_download_bytes_per_second) { - promise::ptr completion_promise(new promise("rate_limiting_group_impl::readsome")); + promise::ptr completion_promise = promise::create("rate_limiting_group_impl::readsome"); rate_limited_tcp_read_operation read_operation(socket, buffer, length, offset, completion_promise); _read_operations_for_next_iteration.push_back(&read_operation); @@ -330,7 +330,7 @@ namespace fc size_t bytes_written; if (_upload_bytes_per_second) { - promise::ptr completion_promise(new promise("rate_limiting_group_impl::writesome")); + promise::ptr completion_promise = promise::create("rate_limiting_group_impl::writesome"); rate_limited_tcp_write_operation write_operation(socket, buffer, length, offset, completion_promise); _write_operations_for_next_iteration.push_back(&write_operation); @@ -367,7 +367,7 @@ namespace fc process_pending_operations(_last_read_iteration_time, _download_bytes_per_second, _read_operations_in_progress, _read_operations_for_next_iteration, _read_tokens, _unused_read_tokens); - _new_read_operation_available_promise = new promise("rate_limiting_group_impl::process_pending_reads"); + _new_read_operation_available_promise = promise::create("rate_limiting_group_impl::process_pending_reads"); try { if (_read_operations_in_progress.empty()) @@ -388,7 +388,7 @@ namespace fc process_pending_operations(_last_write_iteration_time, _upload_bytes_per_second, _write_operations_in_progress, _write_operations_for_next_iteration, _write_tokens, _unused_write_tokens); - _new_write_operation_available_promise = new promise("rate_limiting_group_impl::process_pending_writes"); + _new_write_operation_available_promise = promise::create("rate_limiting_group_impl::process_pending_writes"); try { if (_write_operations_in_progress.empty()) diff --git a/src/network/udp_socket.cpp b/src/network/udp_socket.cpp index c813831..66eb96d 100644 --- a/src/network/udp_socket.cpp +++ b/src/network/udp_socket.cpp @@ -56,7 +56,7 @@ namespace fc { throw; } - promise::ptr completion_promise(new promise("udp_socket::send_to")); + promise::ptr completion_promise = promise::create("udp_socket::send_to"); my->_sock.async_send_to( boost::asio::buffer(buffer, length), to_asio_ep(to), asio::detail::read_write_handler(completion_promise) ); @@ -76,7 +76,7 @@ namespace fc { throw; } - promise::ptr completion_promise(new promise("udp_socket::send_to")); + promise::ptr completion_promise = promise::create("udp_socket::send_to"); my->_sock.async_send_to( boost::asio::buffer(buffer.get(), length), to_asio_ep(to), asio::detail::read_write_handler_with_buffer(completion_promise, buffer) ); @@ -111,7 +111,7 @@ namespace fc { } boost::asio::ip::udp::endpoint boost_from_endpoint; - promise::ptr completion_promise(new promise("udp_socket::receive_from")); + promise::ptr completion_promise = promise::create("udp_socket::receive_from"); my->_sock.async_receive_from( boost::asio::buffer(receive_buffer.get(), receive_buffer_length), boost_from_endpoint, asio::detail::read_write_handler_with_buffer(completion_promise, receive_buffer) ); @@ -137,7 +137,7 @@ namespace fc { } boost::asio::ip::udp::endpoint boost_from_endpoint; - promise::ptr completion_promise(new promise("udp_socket::receive_from")); + promise::ptr completion_promise = promise::create("udp_socket::receive_from"); my->_sock.async_receive_from( boost::asio::buffer(receive_buffer, receive_buffer_length), boost_from_endpoint, asio::detail::read_write_handler(completion_promise) ); size_t bytes_read = completion_promise->wait(); diff --git a/src/rpc/state.cpp b/src/rpc/state.cpp index 97c9ba1..fe2879b 100644 --- a/src/rpc/state.cpp +++ b/src/rpc/state.cpp @@ -46,7 +46,7 @@ void state::handle_reply( const response& response ) request state::start_remote_call( const string& method_name, variants args ) { request request{ _next_id++, method_name, std::move(args) }; - _awaiting[*request.id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); + _awaiting[*request.id] = fc::promise::create("json_connection::async_call"); return request; } variant state::wait_for_response( const variant& request_id ) diff --git a/src/shared_ptr.cpp b/src/shared_ptr.cpp deleted file mode 100644 index 9e3a144..0000000 --- a/src/shared_ptr.cpp +++ /dev/null @@ -1,30 +0,0 @@ -#include -#include -#include -#include - -namespace fc { - retainable::retainable() - :_ref_count(1) { - static_assert( sizeof(_ref_count) == sizeof(boost::atomic), "failed to reserve enough space" ); - } - - retainable::~retainable() { - assert( _ref_count <= 0 ); - assert( _ref_count == 0 ); - } - void retainable::retain() { - ((boost::atomic*)&_ref_count)->fetch_add(1, boost::memory_order_relaxed ); - } - - void retainable::release() { - boost::atomic_thread_fence(boost::memory_order_acquire); - if( 1 == ((boost::atomic*)&_ref_count)->fetch_sub(1, boost::memory_order_release ) ) { - delete this; - } - } - - int32_t retainable::retain_count()const { - return _ref_count; - } -} diff --git a/src/thread/future.cpp b/src/thread/future.cpp index 2111584..d1c9a94 100644 --- a/src/thread/future.cpp +++ b/src/thread/future.cpp @@ -19,7 +19,8 @@ namespace fc { _cancellation_reason(nullptr), #endif _desc(desc), - _compl(nullptr) + _compl(nullptr), + _retain_count(0) { } const char* promise_base::get_desc()const{ @@ -72,7 +73,7 @@ namespace fc { // See https://github.com/cryptonomex/graphene/issues/597 // - ptr p_this = ptr( this, true ); + ptr p_this = shared_from_this(); try { @@ -123,7 +124,7 @@ namespace fc { blocked_thread = _blocked_thread; } if( blocked_thread ) - blocked_thread->notify(ptr(this,true)); + blocked_thread->notify( shared_from_this() ); } promise_base::~promise_base() { } void promise_base::_set_timeout(){ @@ -150,5 +151,13 @@ namespace fc { _compl = c; } } + void promise_base::retain() { + if( _retain_count.fetch_add(1, boost::memory_order_relaxed) == 0 ) + _self = shared_from_this(); + } + void promise_base::release() { + if( _retain_count.fetch_sub(1, boost::memory_order_release) == 1 ) + _self.reset(); + } } diff --git a/src/thread/parallel.cpp b/src/thread/parallel.cpp index 10b709e..10ef63b 100644 --- a/src/thread/parallel.cpp +++ b/src/thread/parallel.cpp @@ -158,8 +158,8 @@ namespace fc { serial_valve::ticket_guard::ticket_guard( boost::atomic*>& latch ) { - my_promise = new promise(); - future* my_future = new future( promise::ptr( my_promise, true ) ); + my_promise = promise::create(); + future* my_future = new future( my_promise ); try { do @@ -171,7 +171,7 @@ namespace fc { } catch (...) { - delete my_future; // this takes care of my_promise as well + delete my_future; throw; } } @@ -190,7 +190,7 @@ namespace fc { serial_valve::serial_valve() { - latch.store( new future( promise::ptr( new promise( true ), true ) ) ); + latch.store( new future( promise::create( true ) ) ); } serial_valve::~serial_valve() diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 267ac62..00cd1d1 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -72,7 +72,7 @@ namespace fc { } thread::thread( const std::string& name, thread_idle_notifier* notifier ) { - promise::ptr p(new promise("thread start")); + promise::ptr p = promise::create("thread start"); boost::thread* t = new boost::thread( [this,p,name,notifier]() { try { set_thread_name(name.c_str()); // set thread's name for the debugger to display diff --git a/tests/thread/parallel_tests.cpp b/tests/thread/parallel_tests.cpp index 6e91da4..14dcb4d 100644 --- a/tests/thread/parallel_tests.cpp +++ b/tests/thread/parallel_tests.cpp @@ -200,25 +200,25 @@ BOOST_AUTO_TEST_CASE( serial_valve ) fc::serial_valve valve; { // Simple test, f2 finishes before f1 - fc::promise* syncer = new fc::promise(); - fc::promise* waiter = new fc::promise(); + fc::promise::ptr syncer = fc::promise::create(); + fc::promise::ptr waiter = fc::promise::create(); auto p1 = fc::async([&counter,&valve,syncer,waiter] () { valve.do_serial( [syncer,waiter](){ syncer->set_value(); - fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + fc::future( waiter ).wait(); }, [&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() ); counter.fetch_add(1); } ); }); - fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + fc::future( syncer ).wait(); // at this point, p1.f1 has started executing and is waiting on waiter - syncer = new fc::promise(); + syncer = fc::promise::create(); auto p2 = fc::async([&counter,&valve,syncer] () { valve.do_serial( [syncer](){ syncer->set_value(); }, [&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() ); counter.fetch_add(1); } ); }); - fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + fc::future( syncer ).wait(); fc::usleep( fc::milliseconds(10) ); @@ -237,37 +237,37 @@ BOOST_AUTO_TEST_CASE( serial_valve ) } { // Triple test, f3 finishes first, then f1, finally f2 - fc::promise* syncer = new fc::promise(); - fc::promise* waiter = new fc::promise(); + fc::promise::ptr syncer = fc::promise::create(); + fc::promise::ptr waiter = fc::promise::create(); counter.store(0); auto p1 = fc::async([&counter,&valve,syncer,waiter] () { valve.do_serial( [&syncer,waiter](){ syncer->set_value(); - fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + fc::future( waiter ).wait(); }, [&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() ); counter.fetch_add(1); } ); }); - fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + fc::future( syncer ).wait(); // at this point, p1.f1 has started executing and is waiting on waiter - syncer = new fc::promise(); + syncer = fc::promise::create(); auto p2 = fc::async([&counter,&valve,syncer] () { valve.do_serial( [&syncer](){ syncer->set_value(); fc::usleep( fc::milliseconds(100) ); }, [&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() ); counter.fetch_add(1); } ); }); - fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + fc::future( syncer ).wait(); // at this point, p2.f1 has started executing and is sleeping - syncer = new fc::promise(); + syncer = fc::promise::create(); auto p3 = fc::async([&counter,&valve,syncer] () { valve.do_serial( [syncer](){ syncer->set_value(); }, [&counter](){ BOOST_CHECK_EQUAL( 2u, counter.load() ); counter.fetch_add(1); } ); }); - fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + fc::future( syncer ).wait(); fc::usleep( fc::milliseconds(10) ); @@ -288,37 +288,37 @@ BOOST_AUTO_TEST_CASE( serial_valve ) } { // Triple test again but with invocations from different threads - fc::promise* syncer = new fc::promise(); - fc::promise* waiter = new fc::promise(); + fc::promise::ptr syncer = fc::promise::create(); + fc::promise::ptr waiter = fc::promise::create(); counter.store(0); auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () { valve.do_serial( [&syncer,waiter](){ syncer->set_value(); - fc::future( fc::shared_ptr>( waiter, true ) ).wait(); }, + fc::future( waiter ).wait(); }, [&counter](){ BOOST_CHECK_EQUAL( 0u, counter.load() ); counter.fetch_add(1); } ); }); - fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + fc::future( syncer ).wait(); // at this point, p1.f1 has started executing and is waiting on waiter - syncer = new fc::promise(); + syncer = fc::promise::create(); auto p2 = fc::do_parallel([&counter,&valve,syncer] () { valve.do_serial( [&syncer](){ syncer->set_value(); fc::usleep( fc::milliseconds(100) ); }, [&counter](){ BOOST_CHECK_EQUAL( 1u, counter.load() ); counter.fetch_add(1); } ); }); - fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + fc::future( syncer ).wait(); // at this point, p2.f1 has started executing and is sleeping - syncer = new fc::promise(); + syncer = fc::promise::create(); auto p3 = fc::do_parallel([&counter,&valve,syncer] () { valve.do_serial( [syncer](){ syncer->set_value(); }, [&counter](){ BOOST_CHECK_EQUAL( 2u, counter.load() ); counter.fetch_add(1); } ); }); - fc::future( fc::shared_ptr>( syncer, true ) ).wait(); + fc::future( syncer ).wait(); fc::usleep( fc::milliseconds(10) ); diff --git a/tests/thread/task_cancel.cpp b/tests/thread/task_cancel.cpp index 3435f39..2909ca7 100644 --- a/tests/thread/task_cancel.cpp +++ b/tests/thread/task_cancel.cpp @@ -136,7 +136,7 @@ BOOST_AUTO_TEST_CASE( cancel_a_task_waiting_on_promise ) { enum task_result{task_completed, task_aborted}; - fc::promise::ptr promise_to_wait_on(new fc::promise()); + fc::promise::ptr promise_to_wait_on = fc::promise::create(); fc::future task = fc::async([promise_to_wait_on]() { BOOST_TEST_MESSAGE("Starting async task");