From c5b4069abd5509d2427e412c592df37b69edb816 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Sun, 9 Sep 2012 19:44:49 -0400 Subject: [PATCH] fixed scheduler deleting retainable --- CMakeCache.txt | 6 ++---- include/fc/future.hpp | 14 +++++++++----- include/fc/json_rpc_connection.hpp | 6 ++++++ include/fc/shared_ptr.hpp | 15 +++++++-------- include/fc/task.hpp | 13 ++++++++++--- include/fc/thread.hpp | 12 ++++++++---- include/fc/thread_d.hpp | 3 ++- src/future.cpp | 14 ++++++++------ src/log.cpp | 10 ++++++++-- src/shared_ptr.cpp | 4 ++++ src/task.cpp | 8 ++++---- src/thread.cpp | 8 +++++--- src/thread_d.hpp | 2 +- 13 files changed, 74 insertions(+), 41 deletions(-) diff --git a/CMakeCache.txt b/CMakeCache.txt index 71a0fef..eb87577 100644 --- a/CMakeCache.txt +++ b/CMakeCache.txt @@ -1,5 +1,5 @@ # This is the CMakeCache file. -# For build in directory: /Users/dlarimer/Downloads/fc +# For build in directory: /Users/dlarimer/projects/tornet/fc # It was generated by CMake: /opt/local/bin/cmake # You can edit this file to change values found and used by cmake. # If you do not want to change any of the values, simply exit the editor. @@ -378,12 +378,10 @@ Boost_UNIT_TEST_FRAMEWORK_LIBRARY_RELEASE-ADVANCED:INTERNAL=1 Boost_VERSION:INTERNAL=105100 //ADVANCED property for variable: CMAKE_AR CMAKE_AR-ADVANCED:INTERNAL=1 -//ADVANCED property for variable: CMAKE_BUILD_TOOL -CMAKE_BUILD_TOOL-ADVANCED:INTERNAL=1 //What is the target build tool cmake is generating for. CMAKE_BUILD_TOOL:INTERNAL=/usr/bin/make //This is the directory where this CMakeCache.txt was created -CMAKE_CACHEFILE_DIR:INTERNAL=/Users/dlarimer/Downloads/fc +CMAKE_CACHEFILE_DIR:INTERNAL=/Users/dlarimer/projects/tornet/fc //Major version of cmake used to create the current loaded cache CMAKE_CACHE_MAJOR_VERSION:INTERNAL=2 //Minor version of cmake used to create the current loaded cache diff --git a/include/fc/future.hpp b/include/fc/future.hpp index 70c53b1..abc0119 100644 --- a/include/fc/future.hpp +++ b/include/fc/future.hpp @@ -49,7 +49,7 @@ namespace fc { class promise_base : virtual public retainable { public: typedef shared_ptr ptr; - promise_base(const char* desc=""); + promise_base(const char* desc="?"); const char* get_desc()const; @@ -68,6 +68,8 @@ namespace fc { void _set_value(const void* v); void _on_complete( detail::completion_handler* c ); + ~promise_base(); + private: friend class thread; friend struct context; @@ -87,10 +89,9 @@ namespace fc { class promise : virtual public promise_base { public: typedef shared_ptr< promise > ptr; - promise( const char* desc = "" ):promise_base(desc){} + promise( const char* desc = "?" ):promise_base(desc){} promise( const T& val ){ set_value(val); } promise( T&& val ){ set_value(fc::move(val) ); } - ~promise(){} const T& wait(const microseconds& timeout = microseconds::max() ){ this->_wait( timeout ); @@ -117,13 +118,14 @@ namespace fc { } protected: optional result; + ~promise(){} }; template<> - class promise : public promise_base { + class promise : virtual public promise_base { public: typedef shared_ptr< promise > ptr; - promise( const char* desc = "" ):promise_base(desc){} + promise( const char* desc = "?" ):promise_base(desc){} promise( const void_t& v ){ set_value(); } void wait(const microseconds& timeout = microseconds::max() ){ @@ -140,6 +142,8 @@ namespace fc { void on_complete( CompletionHandler&& c ) { _on_complete( new detail::completion_handler_impl(fc::forward(c)) ); } + protected: + ~promise(){} }; /** diff --git a/include/fc/json_rpc_connection.hpp b/include/fc/json_rpc_connection.hpp index a70f7ae..c473e79 100644 --- a/include/fc/json_rpc_connection.hpp +++ b/include/fc/json_rpc_connection.hpp @@ -13,18 +13,24 @@ namespace fc { namespace json { void handle_error( const fc::string& ); int64_t id; pending_result::ptr next; + protected: + ~pending_result(){} }; template struct pending_result_impl : virtual public promise, virtual public pending_result { virtual void handle_result( const fc::string& s ) { set_value( fc::json::from_string(s) ); } + protected: + ~pending_result_impl(){} }; template<> struct pending_result_impl : virtual public promise, virtual public pending_result { virtual void handle_result( const fc::string& ) { set_value(); } + protected: + ~pending_result_impl(){} }; } diff --git a/include/fc/shared_ptr.hpp b/include/fc/shared_ptr.hpp index adba5d4..ed85441 100644 --- a/include/fc/shared_ptr.hpp +++ b/include/fc/shared_ptr.hpp @@ -18,8 +18,7 @@ namespace fc { int32_t retain_count()const; protected: - virtual ~retainable(){}; - + virtual ~retainable(); private: volatile int32_t _ref_count; }; @@ -28,20 +27,20 @@ namespace fc { class shared_ptr { public: shared_ptr( T* t, bool inc = false ) - :_ptr(t) { if( inc ) t->retain(); } + :_ptr(t) { if( inc ) t->retain(); } + + shared_ptr():_ptr(nullptr){} - shared_ptr():_ptr(0){} shared_ptr( const shared_ptr& p ) { _ptr = p._ptr; if( _ptr ) _ptr->retain(); } shared_ptr( shared_ptr&& p ) { _ptr = p._ptr; - p._ptr = 0; - } - ~shared_ptr() { - if( _ptr ) _ptr->release(); + p._ptr = nullptr; } + ~shared_ptr() { if( _ptr ) { _ptr->release(); } } + shared_ptr& reset( T* v = 0 ) { if( v == _ptr ) return *this; if( _ptr ) _ptr->release(); diff --git a/include/fc/task.hpp b/include/fc/task.hpp index 69b8700..6c3cdcb 100644 --- a/include/fc/task.hpp +++ b/include/fc/task.hpp @@ -3,15 +3,18 @@ #include #include #include +#include namespace fc { struct context; + class spin_lock; class task_base : virtual public promise_base { public: - ~task_base(); void run(); protected: + ~task_base(); + uint64_t _posted_num; priority _prio; time_point _when; @@ -24,10 +27,10 @@ namespace fc { // thread/thread_private friend class thread; friend class thread_d; - char _spinlock_store[sizeof(void*)]; + fwd _spinlock; // avoid rtti info for every possible functor... - promise_base* _promise_impl; + void* _promise_impl; void* _functor; void (*_destroy_functor)(void*); void (*_run_functor)(void*, void* ); @@ -66,6 +69,8 @@ namespace fc { _run_functor = &detail::functor_run::run; } aligned _functor; + private: + ~task(){} }; template class task : virtual public task_base, virtual public promise { @@ -80,6 +85,8 @@ namespace fc { _run_functor = &detail::void_functor_run::run; } aligned _functor; + private: + ~task(){} }; } diff --git a/include/fc/thread.hpp b/include/fc/thread.hpp index fdb4b3d..4272b53 100644 --- a/include/fc/thread.hpp +++ b/include/fc/thread.hpp @@ -53,9 +53,11 @@ namespace fc { template auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future { typedef decltype(f()) Result; - fc::task* tsk = new fc::task( fc::forward(f) ); + fc::task* tsk = + new fc::task( fc::forward(f) ); + fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); async_task(tsk,prio,desc); - return fc::future(fc::shared_ptr< fc::promise >(tsk,true) ); + return r; } @@ -72,9 +74,11 @@ namespace fc { auto schedule( Functor&& f, const fc::time_point& when, const char* desc = "", priority prio = priority()) -> fc::future { typedef decltype(f()) Result; - fc::task* tsk = new fc::task( fc::forward(f) ); + fc::task* tsk = + new fc::task( fc::forward(f) ); + fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); async_task(tsk,prio,when,desc); - return fc::future(fc::shared_ptr< fc::promise >(tsk,true) ); + return r; } /** diff --git a/include/fc/thread_d.hpp b/include/fc/thread_d.hpp index d033468..7f42837 100644 --- a/include/fc/thread_d.hpp +++ b/include/fc/thread_d.hpp @@ -273,7 +273,8 @@ namespace fc { next->run(); current->cur_task = 0; next->set_active_context(0); - delete next; + next->release(); + //delete next; return true; } return false; diff --git a/src/future.cpp b/src/future.cpp index b6b6126..5a1c0fb 100644 --- a/src/future.cpp +++ b/src/future.cpp @@ -7,17 +7,17 @@ #include + namespace fc { promise_base::promise_base( const char* desc ) - : _ready(false), + :_ready(false), _blocked_thread(nullptr), _timeout(time_point::max()), _canceled(false), _desc(desc), _compl(nullptr) - { - } + { } const char* promise_base::get_desc()const{ return _desc; @@ -63,21 +63,23 @@ namespace fc { _blocked_thread =&thread::current(); } void promise_base::_notify(){ - if( _blocked_thread ) + if( _blocked_thread != nullptr ) _blocked_thread->notify(ptr(this,true)); } + promise_base::~promise_base() { } void promise_base::_set_timeout(){ if( _ready ) return; set_exception( fc::copy_exception( future_wait_timeout() ) ); } void promise_base::_set_value(const void* s){ - BOOST_ASSERT( !_ready ); + // slog( "%p == %d", &_ready, int(_ready)); +// BOOST_ASSERT( !_ready ); { synchronized(_spin_yield) _ready = true; } _notify(); - if( _compl ) { + if( nullptr != _compl ) { _compl->on_complete(s,_except); } } diff --git a/src/log.cpp b/src/log.cpp index 5342f47..9f6c5fe 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -6,6 +8,7 @@ namespace fc { const char* thread_name(); + void* thread_ptr(); const char* short_name( const char* file_name ) { const char* end = file_name + strlen(file_name); @@ -24,11 +27,14 @@ namespace fc { #define fileno _fileno #endif // WIN32 - void log( const char* color, const char* file_name, size_t line_num, const char* method_name, const char* format, ... ) { + void log( const char* color, const char* file_name, size_t line_num, + const char* method_name, const char* format, ... ) { + fc::unique_lock lock(log_mutex()); if(isatty(fileno(stderr))) fprintf( stderr, "\r%s", color ); - fprintf( stderr, "%-15s %-15s %-5zd %s ", thread_name(), short_name(file_name), line_num, method_name ); + fprintf( stderr, "%p %-15s %-15s %-5zd %-15s ", + thread_ptr(), thread_name(), short_name(file_name), line_num, method_name ); va_list args; va_start(args,format); vfprintf( stderr, format, args ); diff --git a/src/shared_ptr.cpp b/src/shared_ptr.cpp index 9100d29..e26d11b 100644 --- a/src/shared_ptr.cpp +++ b/src/shared_ptr.cpp @@ -1,11 +1,15 @@ #include #include #include +#include namespace fc { retainable::retainable() :_ref_count(1) { } + retainable::~retainable() { + assert( _ref_count == 0 ); + } void retainable::retain() { ((boost::atomic*)&_ref_count)->fetch_add(1, boost::memory_order_relaxed ); } diff --git a/src/task.cpp b/src/task.cpp index f798665..d17f173 100644 --- a/src/task.cpp +++ b/src/task.cpp @@ -2,18 +2,19 @@ #include #include #include +#include + namespace fc { task_base::task_base(void* func) :_functor(func){ - new (&_spinlock_store[0]) fc::spin_lock(); } void task_base::run() { try { _run_functor( _functor, _promise_impl ); } catch ( ... ) { - _promise_impl->set_exception( current_exception() ); + set_exception( current_exception() ); } } task_base::~task_base() { @@ -21,8 +22,7 @@ namespace fc { } void task_base::_set_active_context(context* c) { - void* p = &_spinlock_store[0]; - { synchronized( *((fc::spin_lock*)p)) + { synchronized( *_spinlock ) _active_context = c; } } diff --git a/src/thread.cpp b/src/thread.cpp index b04ad28..9a2daf0 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -5,6 +5,9 @@ namespace fc { const char* thread_name() { return thread::current().name().c_str(); + } + void* thread_ptr() { + return &thread::current(); } boost::mutex& log_mutex() { static boost::mutex m; return m; @@ -275,11 +278,10 @@ namespace fc { void thread::notify( const promise_base::ptr& p ) { BOOST_ASSERT(p->ready()); - if( ¤t() != this ) { - this->async( boost::bind( &thread::notify, this, p ) ); + if( !is_current() ) { + this->async( [=](){ notify(p); } ); return; } - //slog( " notify task complete %1%", p.get() ); //debug( "begin notify" ); // TODO: store a list of blocked contexts with the promise // to accelerate the lookup.... unless it introduces contention... diff --git a/src/thread_d.hpp b/src/thread_d.hpp index a601e54..8fa8dc0 100644 --- a/src/thread_d.hpp +++ b/src/thread_d.hpp @@ -279,7 +279,7 @@ namespace fc { next->run(); current->cur_task = 0; next->_set_active_context(0); - delete next; + next->release(); return true; } return false;