From d847f6469ad7771fed1f52aa9e466a7365af9c67 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Sun, 27 Jul 2014 17:37:21 -0400 Subject: [PATCH] Allow us to require assigning descriptions to all async tasks to aid in debugging --- include/fc/signals.hpp | 4 ++-- include/fc/thread/future.hpp | 15 +++++++++++---- include/fc/thread/task.hpp | 4 ++-- include/fc/thread/thread.hpp | 20 ++++++++++---------- src/asio.cpp | 4 ++-- src/io/iostream.cpp | 4 ++-- src/log/file_appender.cpp | 12 ++++++------ src/network/rate_limiting.cpp | 8 ++++---- src/network/udt_socket.cpp | 2 +- src/rpc/json_connection.cpp | 22 +++++++++++----------- src/thread/task.cpp | 1 + src/thread/thread.cpp | 9 ++++----- src/thread/thread_d.hpp | 2 +- tests/task_cancel.cpp | 24 ++++++++++++++++++------ 14 files changed, 75 insertions(+), 56 deletions(-) diff --git a/include/fc/signals.hpp b/include/fc/signals.hpp index 632f4f4..9a7fc12 100644 --- a/include/fc/signals.hpp +++ b/include/fc/signals.hpp @@ -26,13 +26,13 @@ namespace fc { template inline T wait( boost::signals2::signal& sig, const microseconds& timeout_us=microseconds::maximum() ) { - typename promise::ptr p(new promise()); + typename promise::ptr p(new promise("fc::signal::wait")); boost::signals2::scoped_connection c( sig.connect( [=]( T t ) { p->set_value(t); } )); return p->wait( timeout_us ); } inline void wait( boost::signals2::signal& sig, const microseconds& timeout_us=microseconds::maximum() ) { - promise::ptr p(new promise()); + promise::ptr p(new promise("fc::signal::wait")); boost::signals2::scoped_connection c( sig.connect( [=]() { p->set_value(); } )); p->wait( timeout_us ); } diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index 4c0757c..2ae2ef6 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -6,6 +6,13 @@ #include #include +//#define FC_TASK_NAMES_ARE_MANDATORY 1 +#ifdef FC_TASK_NAMES_ARE_MANDATORY +# define FC_TASK_NAME_DEFAULT_ARG +#else +# define FC_TASK_NAME_DEFAULT_ARG = "?" +#endif + namespace fc { class abstract_thread; struct void_t{}; @@ -47,7 +54,7 @@ namespace fc { class promise_base : public virtual retainable{ public: typedef fc::shared_ptr ptr; - promise_base(const char* desc="?"); + promise_base(const char* desc FC_TASK_NAME_DEFAULT_ARG); const char* get_desc()const; @@ -92,7 +99,7 @@ namespace fc { class promise : virtual public promise_base { public: typedef fc::shared_ptr< promise > ptr; - promise( const char* desc = "?" ):promise_base(desc){} + promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} promise( const T& val ){ set_value(val); } promise( T&& val ){ set_value(fc::move(val) ); } @@ -128,8 +135,8 @@ namespace fc { class promise : virtual public promise_base { public: typedef fc::shared_ptr< promise > ptr; - promise( const char* desc = "?" ):promise_base(desc){} - promise( const void_t& ){ set_value(); } + promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} + //promise( const void_t& ){ set_value(); } void wait(const microseconds& timeout = microseconds::maximum() ){ this->_wait( timeout ); diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index 5c1b30e..22e8ce5 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -61,7 +61,7 @@ namespace fc { class task : virtual public task_base, virtual public promise { public: template - task( Functor&& f ):task_base(&_functor) { + task( Functor&& f, const char* desc ):task_base(&_functor), promise_base(desc), promise(desc) { typedef typename fc::deduce::type FunctorType; static_assert( sizeof(f) <= sizeof(_functor), "sizeof(Functor) is larger than FunctorSize" ); new ((char*)&_functor) FunctorType( fc::forward(f) ); @@ -78,7 +78,7 @@ namespace fc { class task : virtual public task_base, virtual public promise { public: template - task( Functor&& f ):task_base(&_functor) { + task( Functor&& f, const char* desc ):task_base(&_functor), promise_base(desc), promise(desc) { typedef typename fc::deduce::type FunctorType; static_assert( sizeof(f) <= sizeof(_functor), "sizeof(Functor) is larger than FunctorSize" ); new ((char*)&_functor) FunctorType( fc::forward(f) ); diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index a133ba4..e6f01c8 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -52,13 +52,13 @@ namespace fc { * @param prio the priority relative to other tasks */ template - auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future { + auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { typedef decltype(f()) Result; typedef typename fc::deduce::type FunctorType; fc::task* tsk = - new fc::task( fc::forward(f) ); + new fc::task( fc::forward(f), desc ); fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - async_task(tsk,prio,desc); + async_task(tsk,prio); return r; } void poke(); @@ -75,12 +75,12 @@ namespace fc { */ template auto schedule( Functor&& f, const fc::time_point& when, - const char* desc = "", priority prio = priority()) -> fc::future { + const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { typedef decltype(f()) Result; fc::task* tsk = - new fc::task( fc::forward(f) ); + new fc::task( fc::forward(f), desc ); fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - async_task(tsk,prio,when,desc); + async_task(tsk,prio,when); return r; } @@ -133,8 +133,8 @@ namespace fc { void exec(); int wait_any_until( std::vector&& v, const time_point& ); - void async_task( task_base* t, const priority& p, const char* desc ); - void async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ); + void async_task( task_base* t, const priority& p ); + void async_task( task_base* t, const priority& p, const time_point& tp ); class thread_d* my; }; @@ -172,11 +172,11 @@ namespace fc { int wait_any_until( std::vector&& v, const time_point& tp ); template - auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future { + auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { return fc::thread::current().async( fc::forward(f), desc, prio ); } template - auto schedule( Functor&& f, const fc::time_point& t, const char* desc ="", priority prio = priority()) -> fc::future { + auto schedule( Functor&& f, const fc::time_point& t, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { return fc::thread::current().schedule( fc::forward(f), t, desc, prio ); } diff --git a/src/asio.cpp b/src/asio.cpp index e618c2d..357f1ad 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -120,7 +120,7 @@ namespace fc { namespace tcp { std::vector resolve( const std::string& hostname, const std::string& port) { resolver res( fc::asio::default_io_service() ); - promise >::ptr p( new promise >() ); + promise >::ptr p( new promise >("tcp::resolve completion") ); res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), boost::bind( detail::resolve_handler, p, _1, _2 ) ); return p->wait();; @@ -129,7 +129,7 @@ namespace fc { namespace udp { std::vector resolve( resolver& r, const std::string& hostname, const std::string& port) { resolver res( fc::asio::default_io_service() ); - promise >::ptr p( new promise >() ); + promise >::ptr p( new promise >("udp::resolve completion") ); res.async_resolve( resolver::query(hostname,port), boost::bind( detail::resolve_handler, p, _1, _2 ) ); return p->wait(); diff --git a/src/io/iostream.cpp b/src/io/iostream.cpp index 7ad272d..6eef009 100644 --- a/src/io/iostream.cpp +++ b/src/io/iostream.cpp @@ -25,7 +25,7 @@ namespace fc { std::cin.read(&c,1); while( !std::cin.eof() ) { while( write_pos - read_pos > 0xfffff ) { - fc::promise::ptr wr( new fc::promise() ); + fc::promise::ptr wr( new fc::promise("cin_buffer::write_ready") ); write_ready = wr; if( write_pos - read_pos <= 0xfffff ) { wr->wait(); @@ -141,7 +141,7 @@ namespace fc { do { while( !b.eof && (b.write_pos - b.read_pos)==0 ){ // wait for more... - fc::promise::ptr rr( new fc::promise() ); + fc::promise::ptr rr( new fc::promise("cin_buffer::read_ready") ); { // copy read_ready because it is accessed from multiple threads fc::scoped_lock lock( b.read_ready_mutex ); b.read_ready = rr; diff --git a/src/log/file_appender.cpp b/src/log/file_appender.cpp index d53c1fc..014fd09 100644 --- a/src/log/file_appender.cpp +++ b/src/log/file_appender.cpp @@ -28,9 +28,9 @@ namespace fc { time_point_sec get_file_start_time( const time_point_sec& timestamp, const microseconds& interval ) { - const auto interval_seconds = interval.to_seconds(); - const auto file_number = timestamp.sec_since_epoch() / interval_seconds; - return time_point_sec( file_number * interval_seconds ); + int64_t interval_seconds = interval.to_seconds(); + int64_t file_number = timestamp.sec_since_epoch() / interval_seconds; + return time_point_sec( (uint32_t)(file_number * interval_seconds) ); } string timestamp_to_string( const time_point_sec& timestamp ) @@ -74,7 +74,7 @@ namespace fc { if( cfg.rotation_compression ) _compression_thread.reset( new thread( "compression") ); - _rotation_task = async( [this]() { rotate_files( true ); }, "rotate_files" ); + _rotation_task = async( [this]() { rotate_files( true ); }, "rotate_files(1)" ); } } @@ -114,7 +114,7 @@ namespace fc { { if( start_time <= _current_file_start_time ) { - _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds(), "log_rotation_task" ); + _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds(), "rotate_files(2)" ); return; } @@ -160,7 +160,7 @@ namespace fc { } _current_file_start_time = start_time; - _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds() ); + _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds(), "rotate_files(3)" ); } }; file_appender::config::config( const fc::path& p ) diff --git a/src/network/rate_limiting.cpp b/src/network/rate_limiting.cpp index ad90f4b..aac30ae 100644 --- a/src/network/rate_limiting.cpp +++ b/src/network/rate_limiting.cpp @@ -207,7 +207,7 @@ namespace fc size_t bytes_read; if (_download_bytes_per_second) { - promise::ptr completion_promise(new promise()); + promise::ptr completion_promise(new promise("rate_limiting_group_impl::readsome")); rate_limited_tcp_read_operation read_operation(socket, buffer, length, completion_promise); _read_operations_for_next_iteration.push_back(&read_operation); @@ -232,7 +232,7 @@ namespace fc size_t bytes_written; if (_upload_bytes_per_second) { - promise::ptr completion_promise(new promise()); + promise::ptr completion_promise(new promise("rate_limiting_group_impl::writesome")); rate_limited_tcp_write_operation write_operation(socket, buffer, length, completion_promise); _write_operations_for_next_iteration.push_back(&write_operation); @@ -259,7 +259,7 @@ namespace fc process_pending_operations(_last_read_iteration_time, _download_bytes_per_second, _read_operations_in_progress, _read_operations_for_next_iteration, _read_tokens, _unused_read_tokens); - _new_read_operation_available_promise = new promise(); + _new_read_operation_available_promise = new promise("rate_limiting_group_impl::process_pending_reads"); try { if (_read_operations_in_progress.empty()) @@ -280,7 +280,7 @@ namespace fc process_pending_operations(_last_write_iteration_time, _upload_bytes_per_second, _write_operations_in_progress, _write_operations_for_next_iteration, _write_tokens, _unused_write_tokens); - _new_write_operation_available_promise = new promise(); + _new_write_operation_available_promise = new promise("rate_limiting_group_impl::process_pending_writes"); try { if (_write_operations_in_progress.empty()) diff --git a/src/network/udt_socket.cpp b/src/network/udt_socket.cpp index 464c55c..0018238 100644 --- a/src/network/udt_socket.cpp +++ b/src/network/udt_socket.cpp @@ -194,7 +194,7 @@ namespace fc { connect_thread.async( [&](){ if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) ) check_udt_errors(); - }).wait(); + }, "udt_socket::connect_to").wait(); bool block = false; UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool)); diff --git a/src/rpc/json_connection.cpp b/src/rpc/json_connection.cpp index fa6273b..06d3fa4 100644 --- a/src/rpc/json_connection.cpp +++ b/src/rpc/json_connection.cpp @@ -324,7 +324,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variants& args ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -350,7 +350,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -368,7 +368,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -388,7 +388,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -411,7 +411,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -436,7 +436,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -463,7 +463,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5, const variant& a6 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -491,7 +491,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5, const variant& a6, const variant& a7 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -525,7 +525,7 @@ namespace fc { namespace rpc { const variant& a4, const variant& a5, const variant& a6, const variant& a7, const variant& a8 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -562,7 +562,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant_object& named_args ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); fc::scoped_lock lock(my->_write_mutex); { *my->_out << "{\"id\":"; @@ -579,7 +579,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); fc::scoped_lock lock(my->_write_mutex); { *my->_out << "{\"id\":"; diff --git a/src/thread/task.cpp b/src/thread/task.cpp index c3e1feb..b60ed25 100644 --- a/src/thread/task.cpp +++ b/src/thread/task.cpp @@ -15,6 +15,7 @@ namespace fc { task_base::task_base(void* func) : + promise_base("task_base"), _posted_num(0), _active_context(nullptr), _next(nullptr), diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index aa00c65..260907e 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -71,7 +71,7 @@ namespace fc { } thread::thread( const std::string& name ) { - promise::ptr p(new promise()); + promise::ptr p(new promise("thread start")); boost::thread* t = new boost::thread( [this,p,name]() { try { set_thread_name(name.c_str()); // set thread's name for the debugger to display @@ -277,8 +277,8 @@ namespace fc { return -1; } - void thread::async_task( task_base* t, const priority& p, const char* desc ) { - async_task( t, p, time_point::min(), desc ); + void thread::async_task( task_base* t, const priority& p ) { + async_task( t, p, time_point::min() ); } void thread::poke() { @@ -286,10 +286,9 @@ namespace fc { my->task_ready.notify_one(); } - void thread::async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ) { + void thread::async_task( task_base* t, const priority& p, const time_point& tp ) { assert(my); t->_when = tp; - t->_desc = desc; // slog( "when %lld", t->_when.time_since_epoch().count() ); // slog( "delay %lld", (tp - fc::time_point::now()).count() ); task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed); diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 314e0a4..f533dd7 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -487,7 +487,7 @@ namespace fc { void unblock( fc::context* c ) { if( fc::thread::current().my != this ) { - self.async( [=](){ unblock(c); } ); + self.async( [=](){ unblock(c); }, "thread_d::unblock" ); return; } if( c != current ) ready_push_front(c); diff --git a/tests/task_cancel.cpp b/tests/task_cancel.cpp index e63544c..c0c76f6 100644 --- a/tests/task_cancel.cpp +++ b/tests/task_cancel.cpp @@ -19,7 +19,7 @@ BOOST_AUTO_TEST_CASE( cancel_an_active_task ) { return sleep_aborted; } - }); + }, "test_task"); fc::time_point start_time = fc::time_point::now(); @@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE( cleanup_cancelled_task ) { BOOST_TEST_MESSAGE("Caught exception in async task, leaving the task's functor"); } - }); + }, "test_task"); std::weak_ptr weak_string_ptr(some_string); some_string.reset(); BOOST_CHECK_MESSAGE(!weak_string_ptr.expired(), "Weak pointer should still be valid because async task should be holding the strong pointer"); @@ -75,18 +75,30 @@ BOOST_AUTO_TEST_CASE( cleanup_cancelled_task ) BOOST_CHECK_MESSAGE(weak_string_ptr.expired(), "Weak pointer should now be invalid because async task should have been destroyed"); } +int task_execute_count = 0; +fc::future simple_task_done; +void simple_task() +{ + task_execute_count++; + simple_task_done = fc::schedule([](){ simple_task(); }, + fc::time_point::now() + fc::seconds(3), + "simple_task"); +} + BOOST_AUTO_TEST_CASE( cancel_scheduled_task ) { bool task_executed = false; try { - auto result = fc::schedule( [&]() { task_executed = true; }, fc::time_point::now() + fc::seconds(3) ); - result.cancel(); - result.wait(); + simple_task(); + simple_task(); + fc::usleep(fc::seconds(4)); + simple_task_done.cancel(); + simple_task_done.wait(); } catch ( const fc::exception& e ) { wlog( "${e}", ("e",e.to_detail_string() ) ); } - BOOST_CHECK(!task_executed); + BOOST_CHECK_EQUAL(task_execute_count, 2); } \ No newline at end of file