Set canceled flag in task's context object so that we cancel out of active tasks when they try to resume. Wrap a try-catch block around tcp and udp resolve requests to convert any non-fc exceptions to fc exceptions. Minor change to clarify design intent that request_time_task should always be running in _ntp_thread.

This commit is contained in:
dnotestein 2014-08-01 11:31:36 -04:00
parent eae493a8c1
commit 7cc69f3bb7
5 changed files with 43 additions and 17 deletions

View file

@ -58,7 +58,7 @@ namespace fc {
const char* get_desc()const; const char* get_desc()const;
void cancel(); virtual void cancel();
bool canceled()const { return _canceled; } bool canceled()const { return _canceled; }
bool ready()const; bool ready()const;
bool error()const; bool error()const;

View file

@ -10,7 +10,9 @@ namespace fc {
class task_base : virtual public promise_base { class task_base : virtual public promise_base {
public: public:
void run(); void run();
virtual void cancel() override;
protected: protected:
~task_base(); ~task_base();
/// Task priority looks like unsupported feature. /// Task priority looks like unsupported feature.

View file

@ -118,22 +118,32 @@ namespace fc {
} }
namespace tcp { namespace tcp {
std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port) { std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port)
resolver res( fc::asio::default_io_service() ); {
promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") ); try
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), {
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) ); resolver res( fc::asio::default_io_service() );
return p->wait();; promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") );
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait();;
} }
FC_RETHROW_EXCEPTIONS(warn, "")
}
} }
namespace udp { namespace udp {
std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port) { std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port)
resolver res( fc::asio::default_io_service() ); {
promise<std::vector<endpoint> >::ptr p( new promise<std::vector<endpoint> >("udp::resolve completion") ); try
res.async_resolve( resolver::query(hostname,port), {
boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) ); resolver res( fc::asio::default_io_service() );
return p->wait(); promise<std::vector<endpoint> >::ptr p( new promise<std::vector<endpoint> >("udp::resolve completion") );
res.async_resolve( resolver::query(hostname,port),
boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait();
} }
FC_RETHROW_EXCEPTIONS(warn, "")
}
} }
} } // namespace fc::asio } } // namespace fc::asio

View file

@ -68,12 +68,14 @@ namespace fc
} }
} // request_now } // request_now
//started for first time in ntp() constructor, canceled in ~ntp() destructor
void request_time_task() void request_time_task()
{ {
assert(_ntp_thread.is_current());
request_now(); request_now();
_request_time_task_done = _ntp_thread.schedule( [=](){ request_time_task(); }, _request_time_task_done = schedule( [=](){ request_time_task(); },
fc::time_point::now() + fc::seconds(_request_interval_sec), fc::time_point::now() + fc::seconds(_request_interval_sec),
"request_time_task" ); "request_time_task" );
} // request_loop } // request_loop
void read_loop() void read_loop()

View file

@ -3,6 +3,7 @@
#include <fc/thread/unique_lock.hpp> #include <fc/thread/unique_lock.hpp>
#include <fc/thread/spin_lock.hpp> #include <fc/thread/spin_lock.hpp>
#include <fc/fwd_impl.hpp> #include <fc/fwd_impl.hpp>
#include "context.hpp"
#include <fc/log/logger.hpp> #include <fc/log/logger.hpp>
#include <boost/exception/all.hpp> #include <boost/exception/all.hpp>
@ -34,6 +35,7 @@ namespace fc {
} }
#endif #endif
} }
void task_base::run_impl() { void task_base::run_impl() {
try { try {
if( !canceled() ) if( !canceled() )
@ -50,6 +52,16 @@ namespace fc {
set_exception( std::make_shared<unhandled_exception>( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); set_exception( std::make_shared<unhandled_exception>( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) );
} }
} }
void task_base::cancel()
{
promise_base::cancel();
if (_active_context)
{
_active_context->canceled = true;
}
}
task_base::~task_base() { task_base::~task_base() {
_destroy_functor( _functor ); _destroy_functor( _functor );
} }