diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index 2ae2ef6..3e544ee 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -58,7 +58,7 @@ namespace fc { const char* get_desc()const; - void cancel(); + virtual void cancel(); bool canceled()const { return _canceled; } bool ready()const; bool error()const; diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index 033caec..ac76cf8 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -10,7 +10,9 @@ namespace fc { class task_base : virtual public promise_base { public: - void run(); + void run(); + virtual void cancel() override; + protected: ~task_base(); /// Task priority looks like unsupported feature. diff --git a/src/asio.cpp b/src/asio.cpp index 357f1ad..befa2ed 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -118,22 +118,32 @@ namespace fc { } namespace tcp { - std::vector resolve( const std::string& hostname, const std::string& port) { - resolver res( fc::asio::default_io_service() ); - promise >::ptr p( new promise >("tcp::resolve completion") ); - res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), - boost::bind( detail::resolve_handler, p, _1, _2 ) ); - return p->wait();; + std::vector resolve( const std::string& hostname, const std::string& port) + { + try + { + resolver res( fc::asio::default_io_service() ); + promise >::ptr p( new promise >("tcp::resolve completion") ); + res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), + boost::bind( detail::resolve_handler, p, _1, _2 ) ); + return p->wait();; } + FC_RETHROW_EXCEPTIONS(warn, "") + } } namespace udp { - std::vector resolve( resolver& r, const std::string& hostname, const std::string& port) { - resolver res( fc::asio::default_io_service() ); - promise >::ptr p( new promise >("udp::resolve completion") ); - res.async_resolve( resolver::query(hostname,port), - boost::bind( detail::resolve_handler, p, _1, _2 ) ); - return p->wait(); + std::vector resolve( resolver& r, const std::string& hostname, const std::string& port) + { + try + { + resolver res( fc::asio::default_io_service() ); + promise >::ptr p( new promise >("udp::resolve completion") ); + res.async_resolve( resolver::query(hostname,port), + boost::bind( detail::resolve_handler, p, _1, _2 ) ); + return p->wait(); } + FC_RETHROW_EXCEPTIONS(warn, "") + } } } } // namespace fc::asio diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index 3d9fb3e..3cf4757 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -68,12 +68,14 @@ namespace fc } } // request_now + //started for first time in ntp() constructor, canceled in ~ntp() destructor void request_time_task() { + assert(_ntp_thread.is_current()); request_now(); - _request_time_task_done = _ntp_thread.schedule( [=](){ request_time_task(); }, - fc::time_point::now() + fc::seconds(_request_interval_sec), - "request_time_task" ); + _request_time_task_done = schedule( [=](){ request_time_task(); }, + fc::time_point::now() + fc::seconds(_request_interval_sec), + "request_time_task" ); } // request_loop void read_loop() diff --git a/src/thread/task.cpp b/src/thread/task.cpp index b60ed25..3188145 100644 --- a/src/thread/task.cpp +++ b/src/thread/task.cpp @@ -3,6 +3,7 @@ #include #include #include +#include "context.hpp" #include #include @@ -34,6 +35,7 @@ namespace fc { } #endif } + void task_base::run_impl() { try { if( !canceled() ) @@ -50,6 +52,16 @@ namespace fc { set_exception( std::make_shared( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); } } + + void task_base::cancel() + { + promise_base::cancel(); + if (_active_context) + { + _active_context->canceled = true; + } + } + task_base::~task_base() { _destroy_functor( _functor ); }