Allow us to require assigning descriptions to all async tasks to aid in debugging
This commit is contained in:
parent
9e320a3db8
commit
d847f6469a
14 changed files with 75 additions and 56 deletions
|
|
@ -26,13 +26,13 @@ namespace fc {
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
inline T wait( boost::signals2::signal<void(T)>& sig, const microseconds& timeout_us=microseconds::maximum() ) {
|
inline T wait( boost::signals2::signal<void(T)>& sig, const microseconds& timeout_us=microseconds::maximum() ) {
|
||||||
typename promise<T>::ptr p(new promise<T>());
|
typename promise<T>::ptr p(new promise<T>("fc::signal::wait"));
|
||||||
boost::signals2::scoped_connection c( sig.connect( [=]( T t ) { p->set_value(t); } ));
|
boost::signals2::scoped_connection c( sig.connect( [=]( T t ) { p->set_value(t); } ));
|
||||||
return p->wait( timeout_us );
|
return p->wait( timeout_us );
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void wait( boost::signals2::signal<void()>& sig, const microseconds& timeout_us=microseconds::maximum() ) {
|
inline void wait( boost::signals2::signal<void()>& sig, const microseconds& timeout_us=microseconds::maximum() ) {
|
||||||
promise<void>::ptr p(new promise<void>());
|
promise<void>::ptr p(new promise<void>("fc::signal::wait"));
|
||||||
boost::signals2::scoped_connection c( sig.connect( [=]() { p->set_value(); } ));
|
boost::signals2::scoped_connection c( sig.connect( [=]() { p->set_value(); } ));
|
||||||
p->wait( timeout_us );
|
p->wait( timeout_us );
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,13 @@
|
||||||
#include <fc/thread/spin_yield_lock.hpp>
|
#include <fc/thread/spin_yield_lock.hpp>
|
||||||
#include <fc/optional.hpp>
|
#include <fc/optional.hpp>
|
||||||
|
|
||||||
|
//#define FC_TASK_NAMES_ARE_MANDATORY 1
|
||||||
|
#ifdef FC_TASK_NAMES_ARE_MANDATORY
|
||||||
|
# define FC_TASK_NAME_DEFAULT_ARG
|
||||||
|
#else
|
||||||
|
# define FC_TASK_NAME_DEFAULT_ARG = "?"
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace fc {
|
namespace fc {
|
||||||
class abstract_thread;
|
class abstract_thread;
|
||||||
struct void_t{};
|
struct void_t{};
|
||||||
|
|
@ -47,7 +54,7 @@ namespace fc {
|
||||||
class promise_base : public virtual retainable{
|
class promise_base : public virtual retainable{
|
||||||
public:
|
public:
|
||||||
typedef fc::shared_ptr<promise_base> ptr;
|
typedef fc::shared_ptr<promise_base> ptr;
|
||||||
promise_base(const char* desc="?");
|
promise_base(const char* desc FC_TASK_NAME_DEFAULT_ARG);
|
||||||
|
|
||||||
const char* get_desc()const;
|
const char* get_desc()const;
|
||||||
|
|
||||||
|
|
@ -92,7 +99,7 @@ namespace fc {
|
||||||
class promise : virtual public promise_base {
|
class promise : virtual public promise_base {
|
||||||
public:
|
public:
|
||||||
typedef fc::shared_ptr< promise<T> > ptr;
|
typedef fc::shared_ptr< promise<T> > ptr;
|
||||||
promise( const char* desc = "?" ):promise_base(desc){}
|
promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){}
|
||||||
promise( const T& val ){ set_value(val); }
|
promise( const T& val ){ set_value(val); }
|
||||||
promise( T&& val ){ set_value(fc::move(val) ); }
|
promise( T&& val ){ set_value(fc::move(val) ); }
|
||||||
|
|
||||||
|
|
@ -128,8 +135,8 @@ namespace fc {
|
||||||
class promise<void> : virtual public promise_base {
|
class promise<void> : virtual public promise_base {
|
||||||
public:
|
public:
|
||||||
typedef fc::shared_ptr< promise<void> > ptr;
|
typedef fc::shared_ptr< promise<void> > ptr;
|
||||||
promise( const char* desc = "?" ):promise_base(desc){}
|
promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){}
|
||||||
promise( const void_t& ){ set_value(); }
|
//promise( const void_t& ){ set_value(); }
|
||||||
|
|
||||||
void wait(const microseconds& timeout = microseconds::maximum() ){
|
void wait(const microseconds& timeout = microseconds::maximum() ){
|
||||||
this->_wait( timeout );
|
this->_wait( timeout );
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ namespace fc {
|
||||||
class task : virtual public task_base, virtual public promise<R> {
|
class task : virtual public task_base, virtual public promise<R> {
|
||||||
public:
|
public:
|
||||||
template<typename Functor>
|
template<typename Functor>
|
||||||
task( Functor&& f ):task_base(&_functor) {
|
task( Functor&& f, const char* desc ):task_base(&_functor), promise_base(desc), promise<R>(desc) {
|
||||||
typedef typename fc::deduce<Functor>::type FunctorType;
|
typedef typename fc::deduce<Functor>::type FunctorType;
|
||||||
static_assert( sizeof(f) <= sizeof(_functor), "sizeof(Functor) is larger than FunctorSize" );
|
static_assert( sizeof(f) <= sizeof(_functor), "sizeof(Functor) is larger than FunctorSize" );
|
||||||
new ((char*)&_functor) FunctorType( fc::forward<Functor>(f) );
|
new ((char*)&_functor) FunctorType( fc::forward<Functor>(f) );
|
||||||
|
|
@ -78,7 +78,7 @@ namespace fc {
|
||||||
class task<void,FunctorSize> : virtual public task_base, virtual public promise<void> {
|
class task<void,FunctorSize> : virtual public task_base, virtual public promise<void> {
|
||||||
public:
|
public:
|
||||||
template<typename Functor>
|
template<typename Functor>
|
||||||
task( Functor&& f ):task_base(&_functor) {
|
task( Functor&& f, const char* desc ):task_base(&_functor), promise_base(desc), promise<void>(desc) {
|
||||||
typedef typename fc::deduce<Functor>::type FunctorType;
|
typedef typename fc::deduce<Functor>::type FunctorType;
|
||||||
static_assert( sizeof(f) <= sizeof(_functor), "sizeof(Functor) is larger than FunctorSize" );
|
static_assert( sizeof(f) <= sizeof(_functor), "sizeof(Functor) is larger than FunctorSize" );
|
||||||
new ((char*)&_functor) FunctorType( fc::forward<Functor>(f) );
|
new ((char*)&_functor) FunctorType( fc::forward<Functor>(f) );
|
||||||
|
|
|
||||||
|
|
@ -52,13 +52,13 @@ namespace fc {
|
||||||
* @param prio the priority relative to other tasks
|
* @param prio the priority relative to other tasks
|
||||||
*/
|
*/
|
||||||
template<typename Functor>
|
template<typename Functor>
|
||||||
auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future<decltype(f())> {
|
auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
|
||||||
typedef decltype(f()) Result;
|
typedef decltype(f()) Result;
|
||||||
typedef typename fc::deduce<Functor>::type FunctorType;
|
typedef typename fc::deduce<Functor>::type FunctorType;
|
||||||
fc::task<Result,sizeof(FunctorType)>* tsk =
|
fc::task<Result,sizeof(FunctorType)>* tsk =
|
||||||
new fc::task<Result,sizeof(FunctorType)>( fc::forward<Functor>(f) );
|
new fc::task<Result,sizeof(FunctorType)>( fc::forward<Functor>(f), desc );
|
||||||
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
|
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
|
||||||
async_task(tsk,prio,desc);
|
async_task(tsk,prio);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
void poke();
|
void poke();
|
||||||
|
|
@ -75,12 +75,12 @@ namespace fc {
|
||||||
*/
|
*/
|
||||||
template<typename Functor>
|
template<typename Functor>
|
||||||
auto schedule( Functor&& f, const fc::time_point& when,
|
auto schedule( Functor&& f, const fc::time_point& when,
|
||||||
const char* desc = "", priority prio = priority()) -> fc::future<decltype(f())> {
|
const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
|
||||||
typedef decltype(f()) Result;
|
typedef decltype(f()) Result;
|
||||||
fc::task<Result,sizeof(Functor)>* tsk =
|
fc::task<Result,sizeof(Functor)>* tsk =
|
||||||
new fc::task<Result,sizeof(Functor)>( fc::forward<Functor>(f) );
|
new fc::task<Result,sizeof(Functor)>( fc::forward<Functor>(f), desc );
|
||||||
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
|
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
|
||||||
async_task(tsk,prio,when,desc);
|
async_task(tsk,prio,when);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -133,8 +133,8 @@ namespace fc {
|
||||||
void exec();
|
void exec();
|
||||||
int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& );
|
int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& );
|
||||||
|
|
||||||
void async_task( task_base* t, const priority& p, const char* desc );
|
void async_task( task_base* t, const priority& p );
|
||||||
void async_task( task_base* t, const priority& p, const time_point& tp, const char* desc );
|
void async_task( task_base* t, const priority& p, const time_point& tp );
|
||||||
class thread_d* my;
|
class thread_d* my;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
@ -172,11 +172,11 @@ namespace fc {
|
||||||
int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& tp );
|
int wait_any_until( std::vector<promise_base::ptr>&& v, const time_point& tp );
|
||||||
|
|
||||||
template<typename Functor>
|
template<typename Functor>
|
||||||
auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future<decltype(f())> {
|
auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
|
||||||
return fc::thread::current().async( fc::forward<Functor>(f), desc, prio );
|
return fc::thread::current().async( fc::forward<Functor>(f), desc, prio );
|
||||||
}
|
}
|
||||||
template<typename Functor>
|
template<typename Functor>
|
||||||
auto schedule( Functor&& f, const fc::time_point& t, const char* desc ="", priority prio = priority()) -> fc::future<decltype(f())> {
|
auto schedule( Functor&& f, const fc::time_point& t, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future<decltype(f())> {
|
||||||
return fc::thread::current().schedule( fc::forward<Functor>(f), t, desc, prio );
|
return fc::thread::current().schedule( fc::forward<Functor>(f), t, desc, prio );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -120,7 +120,7 @@ namespace fc {
|
||||||
namespace tcp {
|
namespace tcp {
|
||||||
std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port) {
|
std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port) {
|
||||||
resolver res( fc::asio::default_io_service() );
|
resolver res( fc::asio::default_io_service() );
|
||||||
promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >() );
|
promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") );
|
||||||
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
|
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
|
||||||
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
|
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
|
||||||
return p->wait();;
|
return p->wait();;
|
||||||
|
|
@ -129,7 +129,7 @@ namespace fc {
|
||||||
namespace udp {
|
namespace udp {
|
||||||
std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port) {
|
std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port) {
|
||||||
resolver res( fc::asio::default_io_service() );
|
resolver res( fc::asio::default_io_service() );
|
||||||
promise<std::vector<endpoint> >::ptr p( new promise<std::vector<endpoint> >() );
|
promise<std::vector<endpoint> >::ptr p( new promise<std::vector<endpoint> >("udp::resolve completion") );
|
||||||
res.async_resolve( resolver::query(hostname,port),
|
res.async_resolve( resolver::query(hostname,port),
|
||||||
boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
|
boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
|
||||||
return p->wait();
|
return p->wait();
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ namespace fc {
|
||||||
std::cin.read(&c,1);
|
std::cin.read(&c,1);
|
||||||
while( !std::cin.eof() ) {
|
while( !std::cin.eof() ) {
|
||||||
while( write_pos - read_pos > 0xfffff ) {
|
while( write_pos - read_pos > 0xfffff ) {
|
||||||
fc::promise<void>::ptr wr( new fc::promise<void>() );
|
fc::promise<void>::ptr wr( new fc::promise<void>("cin_buffer::write_ready") );
|
||||||
write_ready = wr;
|
write_ready = wr;
|
||||||
if( write_pos - read_pos <= 0xfffff ) {
|
if( write_pos - read_pos <= 0xfffff ) {
|
||||||
wr->wait();
|
wr->wait();
|
||||||
|
|
@ -141,7 +141,7 @@ namespace fc {
|
||||||
do {
|
do {
|
||||||
while( !b.eof && (b.write_pos - b.read_pos)==0 ){
|
while( !b.eof && (b.write_pos - b.read_pos)==0 ){
|
||||||
// wait for more...
|
// wait for more...
|
||||||
fc::promise<void>::ptr rr( new fc::promise<void>() );
|
fc::promise<void>::ptr rr( new fc::promise<void>("cin_buffer::read_ready") );
|
||||||
{ // copy read_ready because it is accessed from multiple threads
|
{ // copy read_ready because it is accessed from multiple threads
|
||||||
fc::scoped_lock<boost::mutex> lock( b.read_ready_mutex );
|
fc::scoped_lock<boost::mutex> lock( b.read_ready_mutex );
|
||||||
b.read_ready = rr;
|
b.read_ready = rr;
|
||||||
|
|
|
||||||
|
|
@ -28,9 +28,9 @@ namespace fc {
|
||||||
|
|
||||||
time_point_sec get_file_start_time( const time_point_sec& timestamp, const microseconds& interval )
|
time_point_sec get_file_start_time( const time_point_sec& timestamp, const microseconds& interval )
|
||||||
{
|
{
|
||||||
const auto interval_seconds = interval.to_seconds();
|
int64_t interval_seconds = interval.to_seconds();
|
||||||
const auto file_number = timestamp.sec_since_epoch() / interval_seconds;
|
int64_t file_number = timestamp.sec_since_epoch() / interval_seconds;
|
||||||
return time_point_sec( file_number * interval_seconds );
|
return time_point_sec( (uint32_t)(file_number * interval_seconds) );
|
||||||
}
|
}
|
||||||
|
|
||||||
string timestamp_to_string( const time_point_sec& timestamp )
|
string timestamp_to_string( const time_point_sec& timestamp )
|
||||||
|
|
@ -74,7 +74,7 @@ namespace fc {
|
||||||
if( cfg.rotation_compression )
|
if( cfg.rotation_compression )
|
||||||
_compression_thread.reset( new thread( "compression") );
|
_compression_thread.reset( new thread( "compression") );
|
||||||
|
|
||||||
_rotation_task = async( [this]() { rotate_files( true ); }, "rotate_files" );
|
_rotation_task = async( [this]() { rotate_files( true ); }, "rotate_files(1)" );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -114,7 +114,7 @@ namespace fc {
|
||||||
{
|
{
|
||||||
if( start_time <= _current_file_start_time )
|
if( start_time <= _current_file_start_time )
|
||||||
{
|
{
|
||||||
_rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds(), "log_rotation_task" );
|
_rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds(), "rotate_files(2)" );
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -160,7 +160,7 @@ namespace fc {
|
||||||
}
|
}
|
||||||
|
|
||||||
_current_file_start_time = start_time;
|
_current_file_start_time = start_time;
|
||||||
_rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds() );
|
_rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds(), "rotate_files(3)" );
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
file_appender::config::config( const fc::path& p )
|
file_appender::config::config( const fc::path& p )
|
||||||
|
|
|
||||||
|
|
@ -207,7 +207,7 @@ namespace fc
|
||||||
size_t bytes_read;
|
size_t bytes_read;
|
||||||
if (_download_bytes_per_second)
|
if (_download_bytes_per_second)
|
||||||
{
|
{
|
||||||
promise<size_t>::ptr completion_promise(new promise<size_t>());
|
promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::readsome"));
|
||||||
rate_limited_tcp_read_operation read_operation(socket, buffer, length, completion_promise);
|
rate_limited_tcp_read_operation read_operation(socket, buffer, length, completion_promise);
|
||||||
_read_operations_for_next_iteration.push_back(&read_operation);
|
_read_operations_for_next_iteration.push_back(&read_operation);
|
||||||
|
|
||||||
|
|
@ -232,7 +232,7 @@ namespace fc
|
||||||
size_t bytes_written;
|
size_t bytes_written;
|
||||||
if (_upload_bytes_per_second)
|
if (_upload_bytes_per_second)
|
||||||
{
|
{
|
||||||
promise<size_t>::ptr completion_promise(new promise<size_t>());
|
promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::writesome"));
|
||||||
rate_limited_tcp_write_operation write_operation(socket, buffer, length, completion_promise);
|
rate_limited_tcp_write_operation write_operation(socket, buffer, length, completion_promise);
|
||||||
_write_operations_for_next_iteration.push_back(&write_operation);
|
_write_operations_for_next_iteration.push_back(&write_operation);
|
||||||
|
|
||||||
|
|
@ -259,7 +259,7 @@ namespace fc
|
||||||
process_pending_operations(_last_read_iteration_time, _download_bytes_per_second,
|
process_pending_operations(_last_read_iteration_time, _download_bytes_per_second,
|
||||||
_read_operations_in_progress, _read_operations_for_next_iteration, _read_tokens, _unused_read_tokens);
|
_read_operations_in_progress, _read_operations_for_next_iteration, _read_tokens, _unused_read_tokens);
|
||||||
|
|
||||||
_new_read_operation_available_promise = new promise<void>();
|
_new_read_operation_available_promise = new promise<void>("rate_limiting_group_impl::process_pending_reads");
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (_read_operations_in_progress.empty())
|
if (_read_operations_in_progress.empty())
|
||||||
|
|
@ -280,7 +280,7 @@ namespace fc
|
||||||
process_pending_operations(_last_write_iteration_time, _upload_bytes_per_second,
|
process_pending_operations(_last_write_iteration_time, _upload_bytes_per_second,
|
||||||
_write_operations_in_progress, _write_operations_for_next_iteration, _write_tokens, _unused_write_tokens);
|
_write_operations_in_progress, _write_operations_for_next_iteration, _write_tokens, _unused_write_tokens);
|
||||||
|
|
||||||
_new_write_operation_available_promise = new promise<void>();
|
_new_write_operation_available_promise = new promise<void>("rate_limiting_group_impl::process_pending_writes");
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (_write_operations_in_progress.empty())
|
if (_write_operations_in_progress.empty())
|
||||||
|
|
|
||||||
|
|
@ -194,7 +194,7 @@ namespace fc {
|
||||||
connect_thread.async( [&](){
|
connect_thread.async( [&](){
|
||||||
if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) )
|
if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) )
|
||||||
check_udt_errors();
|
check_udt_errors();
|
||||||
}).wait();
|
}, "udt_socket::connect_to").wait();
|
||||||
|
|
||||||
bool block = false;
|
bool block = false;
|
||||||
UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool));
|
UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool));
|
||||||
|
|
|
||||||
|
|
@ -324,7 +324,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variants& args )
|
future<variant> json_connection::async_call( const fc::string& method, const variants& args )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -350,7 +350,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variant& a1 )
|
future<variant> json_connection::async_call( const fc::string& method, const variant& a1 )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -368,7 +368,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2 )
|
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2 )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -388,7 +388,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3 )
|
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3 )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -411,7 +411,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4 )
|
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4 )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -436,7 +436,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5 )
|
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5 )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -463,7 +463,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5, const variant& a6 )
|
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5, const variant& a6 )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -491,7 +491,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5, const variant& a6, const variant& a7 )
|
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5, const variant& a6, const variant& a7 )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -525,7 +525,7 @@ namespace fc { namespace rpc {
|
||||||
const variant& a4, const variant& a5, const variant& a6, const variant& a7, const variant& a8 )
|
const variant& a4, const variant& a5, const variant& a6, const variant& a7, const variant& a8 )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
|
|
||||||
{
|
{
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
|
|
@ -562,7 +562,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method, const variant_object& named_args )
|
future<variant> json_connection::async_call( const fc::string& method, const variant_object& named_args )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
{
|
{
|
||||||
*my->_out << "{\"id\":";
|
*my->_out << "{\"id\":";
|
||||||
|
|
@ -579,7 +579,7 @@ namespace fc { namespace rpc {
|
||||||
future<variant> json_connection::async_call( const fc::string& method )
|
future<variant> json_connection::async_call( const fc::string& method )
|
||||||
{
|
{
|
||||||
auto id = my->_next_id++;
|
auto id = my->_next_id++;
|
||||||
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
|
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
|
||||||
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
fc::scoped_lock<fc::mutex> lock(my->_write_mutex);
|
||||||
{
|
{
|
||||||
*my->_out << "{\"id\":";
|
*my->_out << "{\"id\":";
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@
|
||||||
namespace fc {
|
namespace fc {
|
||||||
task_base::task_base(void* func)
|
task_base::task_base(void* func)
|
||||||
:
|
:
|
||||||
|
promise_base("task_base"),
|
||||||
_posted_num(0),
|
_posted_num(0),
|
||||||
_active_context(nullptr),
|
_active_context(nullptr),
|
||||||
_next(nullptr),
|
_next(nullptr),
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ namespace fc {
|
||||||
}
|
}
|
||||||
|
|
||||||
thread::thread( const std::string& name ) {
|
thread::thread( const std::string& name ) {
|
||||||
promise<void>::ptr p(new promise<void>());
|
promise<void>::ptr p(new promise<void>("thread start"));
|
||||||
boost::thread* t = new boost::thread( [this,p,name]() {
|
boost::thread* t = new boost::thread( [this,p,name]() {
|
||||||
try {
|
try {
|
||||||
set_thread_name(name.c_str()); // set thread's name for the debugger to display
|
set_thread_name(name.c_str()); // set thread's name for the debugger to display
|
||||||
|
|
@ -277,8 +277,8 @@ namespace fc {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void thread::async_task( task_base* t, const priority& p, const char* desc ) {
|
void thread::async_task( task_base* t, const priority& p ) {
|
||||||
async_task( t, p, time_point::min(), desc );
|
async_task( t, p, time_point::min() );
|
||||||
}
|
}
|
||||||
|
|
||||||
void thread::poke() {
|
void thread::poke() {
|
||||||
|
|
@ -286,10 +286,9 @@ namespace fc {
|
||||||
my->task_ready.notify_one();
|
my->task_ready.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
void thread::async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ) {
|
void thread::async_task( task_base* t, const priority& p, const time_point& tp ) {
|
||||||
assert(my);
|
assert(my);
|
||||||
t->_when = tp;
|
t->_when = tp;
|
||||||
t->_desc = desc;
|
|
||||||
// slog( "when %lld", t->_when.time_since_epoch().count() );
|
// slog( "when %lld", t->_when.time_since_epoch().count() );
|
||||||
// slog( "delay %lld", (tp - fc::time_point::now()).count() );
|
// slog( "delay %lld", (tp - fc::time_point::now()).count() );
|
||||||
task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
|
task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
|
||||||
|
|
|
||||||
|
|
@ -487,7 +487,7 @@ namespace fc {
|
||||||
|
|
||||||
void unblock( fc::context* c ) {
|
void unblock( fc::context* c ) {
|
||||||
if( fc::thread::current().my != this ) {
|
if( fc::thread::current().my != this ) {
|
||||||
self.async( [=](){ unblock(c); } );
|
self.async( [=](){ unblock(c); }, "thread_d::unblock" );
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if( c != current ) ready_push_front(c);
|
if( c != current ) ready_push_front(c);
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ BOOST_AUTO_TEST_CASE( cancel_an_active_task )
|
||||||
{
|
{
|
||||||
return sleep_aborted;
|
return sleep_aborted;
|
||||||
}
|
}
|
||||||
});
|
}, "test_task");
|
||||||
|
|
||||||
fc::time_point start_time = fc::time_point::now();
|
fc::time_point start_time = fc::time_point::now();
|
||||||
|
|
||||||
|
|
@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE( cleanup_cancelled_task )
|
||||||
{
|
{
|
||||||
BOOST_TEST_MESSAGE("Caught exception in async task, leaving the task's functor");
|
BOOST_TEST_MESSAGE("Caught exception in async task, leaving the task's functor");
|
||||||
}
|
}
|
||||||
});
|
}, "test_task");
|
||||||
std::weak_ptr<std::string> weak_string_ptr(some_string);
|
std::weak_ptr<std::string> weak_string_ptr(some_string);
|
||||||
some_string.reset();
|
some_string.reset();
|
||||||
BOOST_CHECK_MESSAGE(!weak_string_ptr.expired(), "Weak pointer should still be valid because async task should be holding the strong pointer");
|
BOOST_CHECK_MESSAGE(!weak_string_ptr.expired(), "Weak pointer should still be valid because async task should be holding the strong pointer");
|
||||||
|
|
@ -75,18 +75,30 @@ BOOST_AUTO_TEST_CASE( cleanup_cancelled_task )
|
||||||
BOOST_CHECK_MESSAGE(weak_string_ptr.expired(), "Weak pointer should now be invalid because async task should have been destroyed");
|
BOOST_CHECK_MESSAGE(weak_string_ptr.expired(), "Weak pointer should now be invalid because async task should have been destroyed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int task_execute_count = 0;
|
||||||
|
fc::future<void> simple_task_done;
|
||||||
|
void simple_task()
|
||||||
|
{
|
||||||
|
task_execute_count++;
|
||||||
|
simple_task_done = fc::schedule([](){ simple_task(); },
|
||||||
|
fc::time_point::now() + fc::seconds(3),
|
||||||
|
"simple_task");
|
||||||
|
}
|
||||||
|
|
||||||
BOOST_AUTO_TEST_CASE( cancel_scheduled_task )
|
BOOST_AUTO_TEST_CASE( cancel_scheduled_task )
|
||||||
{
|
{
|
||||||
bool task_executed = false;
|
bool task_executed = false;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto result = fc::schedule( [&]() { task_executed = true; }, fc::time_point::now() + fc::seconds(3) );
|
simple_task();
|
||||||
result.cancel();
|
simple_task();
|
||||||
result.wait();
|
fc::usleep(fc::seconds(4));
|
||||||
|
simple_task_done.cancel();
|
||||||
|
simple_task_done.wait();
|
||||||
}
|
}
|
||||||
catch ( const fc::exception& e )
|
catch ( const fc::exception& e )
|
||||||
{
|
{
|
||||||
wlog( "${e}", ("e",e.to_detail_string() ) );
|
wlog( "${e}", ("e",e.to_detail_string() ) );
|
||||||
}
|
}
|
||||||
BOOST_CHECK(!task_executed);
|
BOOST_CHECK_EQUAL(task_execute_count, 2);
|
||||||
}
|
}
|
||||||
Loading…
Reference in a new issue