fixed scheduler deleting retainable

This commit is contained in:
Daniel Larimer 2012-09-09 19:44:49 -04:00
parent bebe2d9146
commit c5b4069abd
13 changed files with 74 additions and 41 deletions

View file

@ -1,5 +1,5 @@
# This is the CMakeCache file.
# For build in directory: /Users/dlarimer/Downloads/fc
# For build in directory: /Users/dlarimer/projects/tornet/fc
# It was generated by CMake: /opt/local/bin/cmake
# You can edit this file to change values found and used by cmake.
# If you do not want to change any of the values, simply exit the editor.
@ -378,12 +378,10 @@ Boost_UNIT_TEST_FRAMEWORK_LIBRARY_RELEASE-ADVANCED:INTERNAL=1
Boost_VERSION:INTERNAL=105100
//ADVANCED property for variable: CMAKE_AR
CMAKE_AR-ADVANCED:INTERNAL=1
//ADVANCED property for variable: CMAKE_BUILD_TOOL
CMAKE_BUILD_TOOL-ADVANCED:INTERNAL=1
//What is the target build tool cmake is generating for.
CMAKE_BUILD_TOOL:INTERNAL=/usr/bin/make
//This is the directory where this CMakeCache.txt was created
CMAKE_CACHEFILE_DIR:INTERNAL=/Users/dlarimer/Downloads/fc
CMAKE_CACHEFILE_DIR:INTERNAL=/Users/dlarimer/projects/tornet/fc
//Major version of cmake used to create the current loaded cache
CMAKE_CACHE_MAJOR_VERSION:INTERNAL=2
//Minor version of cmake used to create the current loaded cache

View file

@ -49,7 +49,7 @@ namespace fc {
class promise_base : virtual public retainable {
public:
typedef shared_ptr<promise_base> ptr;
promise_base(const char* desc="");
promise_base(const char* desc="?");
const char* get_desc()const;
@ -68,6 +68,8 @@ namespace fc {
void _set_value(const void* v);
void _on_complete( detail::completion_handler* c );
~promise_base();
private:
friend class thread;
friend struct context;
@ -87,10 +89,9 @@ namespace fc {
class promise : virtual public promise_base {
public:
typedef shared_ptr< promise<T> > ptr;
promise( const char* desc = "" ):promise_base(desc){}
promise( const char* desc = "?" ):promise_base(desc){}
promise( const T& val ){ set_value(val); }
promise( T&& val ){ set_value(fc::move(val) ); }
~promise(){}
const T& wait(const microseconds& timeout = microseconds::max() ){
this->_wait( timeout );
@ -117,13 +118,14 @@ namespace fc {
}
protected:
optional<T> result;
~promise(){}
};
template<>
class promise<void> : public promise_base {
class promise<void> : virtual public promise_base {
public:
typedef shared_ptr< promise<void> > ptr;
promise( const char* desc = "" ):promise_base(desc){}
promise( const char* desc = "?" ):promise_base(desc){}
promise( const void_t& v ){ set_value(); }
void wait(const microseconds& timeout = microseconds::max() ){
@ -140,6 +142,8 @@ namespace fc {
void on_complete( CompletionHandler&& c ) {
_on_complete( new detail::completion_handler_impl<CompletionHandler,void>(fc::forward<CompletionHandler>(c)) );
}
protected:
~promise(){}
};
/**

View file

@ -13,18 +13,24 @@ namespace fc { namespace json {
void handle_error( const fc::string& );
int64_t id;
pending_result::ptr next;
protected:
~pending_result(){}
};
template<typename T>
struct pending_result_impl : virtual public promise<T>, virtual public pending_result {
virtual void handle_result( const fc::string& s ) {
set_value( fc::json::from_string<T>(s) );
}
protected:
~pending_result_impl(){}
};
template<>
struct pending_result_impl<void> : virtual public promise<void>, virtual public pending_result {
virtual void handle_result( const fc::string& ) {
set_value();
}
protected:
~pending_result_impl(){}
};
}

View file

@ -18,8 +18,7 @@ namespace fc {
int32_t retain_count()const;
protected:
virtual ~retainable(){};
virtual ~retainable();
private:
volatile int32_t _ref_count;
};
@ -28,20 +27,20 @@ namespace fc {
class shared_ptr {
public:
shared_ptr( T* t, bool inc = false )
:_ptr(t) { if( inc ) t->retain(); }
:_ptr(t) { if( inc ) t->retain(); }
shared_ptr():_ptr(nullptr){}
shared_ptr():_ptr(0){}
shared_ptr( const shared_ptr& p ) {
_ptr = p._ptr;
if( _ptr ) _ptr->retain();
}
shared_ptr( shared_ptr&& p ) {
_ptr = p._ptr;
p._ptr = 0;
}
~shared_ptr() {
if( _ptr ) _ptr->release();
p._ptr = nullptr;
}
~shared_ptr() { if( _ptr ) { _ptr->release(); } }
shared_ptr& reset( T* v = 0 ) {
if( v == _ptr ) return *this;
if( _ptr ) _ptr->release();

View file

@ -3,15 +3,18 @@
#include <fc/future.hpp>
#include <fc/priority.hpp>
#include <fc/aligned.hpp>
#include <fc/fwd.hpp>
namespace fc {
struct context;
class spin_lock;
class task_base : virtual public promise_base {
public:
~task_base();
void run();
protected:
~task_base();
uint64_t _posted_num;
priority _prio;
time_point _when;
@ -24,10 +27,10 @@ namespace fc {
// thread/thread_private
friend class thread;
friend class thread_d;
char _spinlock_store[sizeof(void*)];
fwd<spin_lock,8> _spinlock;
// avoid rtti info for every possible functor...
promise_base* _promise_impl;
void* _promise_impl;
void* _functor;
void (*_destroy_functor)(void*);
void (*_run_functor)(void*, void* );
@ -66,6 +69,8 @@ namespace fc {
_run_functor = &detail::functor_run<Functor>::run;
}
aligned<FunctorSize> _functor;
private:
~task(){}
};
template<uint64_t FunctorSize>
class task<void,FunctorSize> : virtual public task_base, virtual public promise<void> {
@ -80,6 +85,8 @@ namespace fc {
_run_functor = &detail::void_functor_run<Functor>::run;
}
aligned<FunctorSize> _functor;
private:
~task(){}
};
}

View file

@ -53,9 +53,11 @@ namespace fc {
template<typename Functor>
auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future<decltype(f())> {
typedef decltype(f()) Result;
fc::task<Result,sizeof(Functor)>* tsk = new fc::task<Result,sizeof(Functor)>( fc::forward<Functor>(f) );
fc::task<Result,sizeof(Functor)>* tsk =
new fc::task<Result,sizeof(Functor)>( fc::forward<Functor>(f) );
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
async_task(tsk,prio,desc);
return fc::future<Result>(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
return r;
}
@ -72,9 +74,11 @@ namespace fc {
auto schedule( Functor&& f, const fc::time_point& when,
const char* desc = "", priority prio = priority()) -> fc::future<decltype(f())> {
typedef decltype(f()) Result;
fc::task<Result,sizeof(Functor)>* tsk = new fc::task<Result,sizeof(Functor)>( fc::forward<Functor>(f) );
fc::task<Result,sizeof(Functor)>* tsk =
new fc::task<Result,sizeof(Functor)>( fc::forward<Functor>(f) );
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
async_task(tsk,prio,when,desc);
return fc::future<Result>(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
return r;
}
/**

View file

@ -273,7 +273,8 @@ namespace fc {
next->run();
current->cur_task = 0;
next->set_active_context(0);
delete next;
next->release();
//delete next;
return true;
}
return false;

View file

@ -7,17 +7,17 @@
#include <boost/assert.hpp>
namespace fc {
promise_base::promise_base( const char* desc )
: _ready(false),
:_ready(false),
_blocked_thread(nullptr),
_timeout(time_point::max()),
_canceled(false),
_desc(desc),
_compl(nullptr)
{
}
{ }
const char* promise_base::get_desc()const{
return _desc;
@ -63,21 +63,23 @@ namespace fc {
_blocked_thread =&thread::current();
}
void promise_base::_notify(){
if( _blocked_thread )
if( _blocked_thread != nullptr )
_blocked_thread->notify(ptr(this,true));
}
promise_base::~promise_base() { }
void promise_base::_set_timeout(){
if( _ready )
return;
set_exception( fc::copy_exception( future_wait_timeout() ) );
}
void promise_base::_set_value(const void* s){
BOOST_ASSERT( !_ready );
// slog( "%p == %d", &_ready, int(_ready));
// BOOST_ASSERT( !_ready );
{ synchronized(_spin_yield)
_ready = true;
}
_notify();
if( _compl ) {
if( nullptr != _compl ) {
_compl->on_complete(s,_except);
}
}

View file

@ -1,4 +1,6 @@
#include <fc/log.hpp>
#include <fc/unique_lock.hpp>
#include <boost/thread/mutex.hpp>
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
@ -6,6 +8,7 @@
namespace fc {
const char* thread_name();
void* thread_ptr();
const char* short_name( const char* file_name ) {
const char* end = file_name + strlen(file_name);
@ -24,11 +27,14 @@ namespace fc {
#define fileno _fileno
#endif // WIN32
void log( const char* color, const char* file_name, size_t line_num, const char* method_name, const char* format, ... ) {
void log( const char* color, const char* file_name, size_t line_num,
const char* method_name, const char* format, ... ) {
fc::unique_lock<boost::mutex> lock(log_mutex());
if(isatty(fileno(stderr)))
fprintf( stderr, "\r%s", color );
fprintf( stderr, "%-15s %-15s %-5zd %s ", thread_name(), short_name(file_name), line_num, method_name );
fprintf( stderr, "%p %-15s %-15s %-5zd %-15s ",
thread_ptr(), thread_name(), short_name(file_name), line_num, method_name );
va_list args;
va_start(args,format);
vfprintf( stderr, format, args );

View file

@ -1,11 +1,15 @@
#include <fc/shared_ptr.hpp>
#include <boost/atomic.hpp>
#include <boost/memory_order.hpp>
#include <assert.h>
namespace fc {
retainable::retainable()
:_ref_count(1) { }
retainable::~retainable() {
assert( _ref_count == 0 );
}
void retainable::retain() {
((boost::atomic<int32_t>*)&_ref_count)->fetch_add(1, boost::memory_order_relaxed );
}

View file

@ -2,18 +2,19 @@
#include <fc/exception.hpp>
#include <fc/unique_lock.hpp>
#include <fc/spin_lock.hpp>
#include <fc/fwd_impl.hpp>
namespace fc {
task_base::task_base(void* func)
:_functor(func){
new (&_spinlock_store[0]) fc::spin_lock();
}
void task_base::run() {
try {
_run_functor( _functor, _promise_impl );
} catch ( ... ) {
_promise_impl->set_exception( current_exception() );
set_exception( current_exception() );
}
}
task_base::~task_base() {
@ -21,8 +22,7 @@ namespace fc {
}
void task_base::_set_active_context(context* c) {
void* p = &_spinlock_store[0];
{ synchronized( *((fc::spin_lock*)p))
{ synchronized( *_spinlock )
_active_context = c;
}
}

View file

@ -5,6 +5,9 @@
namespace fc {
const char* thread_name() {
return thread::current().name().c_str();
}
void* thread_ptr() {
return &thread::current();
}
boost::mutex& log_mutex() {
static boost::mutex m; return m;
@ -275,11 +278,10 @@ namespace fc {
void thread::notify( const promise_base::ptr& p ) {
BOOST_ASSERT(p->ready());
if( &current() != this ) {
this->async( boost::bind( &thread::notify, this, p ) );
if( !is_current() ) {
this->async( [=](){ notify(p); } );
return;
}
//slog( " notify task complete %1%", p.get() );
//debug( "begin notify" );
// TODO: store a list of blocked contexts with the promise
// to accelerate the lookup.... unless it introduces contention...

View file

@ -279,7 +279,7 @@ namespace fc {
next->run();
current->cur_task = 0;
next->_set_active_context(0);
delete next;
next->release();
return true;
}
return false;