diff --git a/CMakeLists.txt b/CMakeLists.txt index fb27c3c..773adae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -202,7 +202,9 @@ ELSE() IF(APPLE) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -stdlib=libc++ -Wall") ELSE() - target_compile_options(fc PUBLIC -std=c++11 -Wall -fnon-call-exceptions) + if( NOT "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang" ) + target_compile_options(fc PUBLIC -std=c++11 -Wall -fnon-call-exceptions) + endif() SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -fnon-call-exceptions") ENDIF() ENDIF() diff --git a/include/fc/signals.hpp b/include/fc/signals.hpp index 632f4f4..9a7fc12 100644 --- a/include/fc/signals.hpp +++ b/include/fc/signals.hpp @@ -26,13 +26,13 @@ namespace fc { template inline T wait( boost::signals2::signal& sig, const microseconds& timeout_us=microseconds::maximum() ) { - typename promise::ptr p(new promise()); + typename promise::ptr p(new promise("fc::signal::wait")); boost::signals2::scoped_connection c( sig.connect( [=]( T t ) { p->set_value(t); } )); return p->wait( timeout_us ); } inline void wait( boost::signals2::signal& sig, const microseconds& timeout_us=microseconds::maximum() ) { - promise::ptr p(new promise()); + promise::ptr p(new promise("fc::signal::wait")); boost::signals2::scoped_connection c( sig.connect( [=]() { p->set_value(); } )); p->wait( timeout_us ); } diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index 86e8a96..3e544ee 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -6,6 +6,13 @@ #include #include +//#define FC_TASK_NAMES_ARE_MANDATORY 1 +#ifdef FC_TASK_NAMES_ARE_MANDATORY +# define FC_TASK_NAME_DEFAULT_ARG +#else +# define FC_TASK_NAME_DEFAULT_ARG = "?" +#endif + namespace fc { class abstract_thread; struct void_t{}; @@ -47,11 +54,11 @@ namespace fc { class promise_base : public virtual retainable{ public: typedef fc::shared_ptr ptr; - promise_base(const char* desc="?"); + promise_base(const char* desc FC_TASK_NAME_DEFAULT_ARG); const char* get_desc()const; - void cancel(); + virtual void cancel(); bool canceled()const { return _canceled; } bool ready()const; bool error()const; @@ -92,7 +99,7 @@ namespace fc { class promise : virtual public promise_base { public: typedef fc::shared_ptr< promise > ptr; - promise( const char* desc = "?" ):promise_base(desc){} + promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} promise( const T& val ){ set_value(val); } promise( T&& val ){ set_value(fc::move(val) ); } @@ -128,8 +135,8 @@ namespace fc { class promise : virtual public promise_base { public: typedef fc::shared_ptr< promise > ptr; - promise( const char* desc = "?" ):promise_base(desc){} - promise( const void_t& ){ set_value(); } + promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){} + //promise( const void_t& ){ set_value(); } void wait(const microseconds& timeout = microseconds::maximum() ){ this->_wait( timeout ); @@ -211,7 +218,13 @@ namespace fc { if( valid() ) { cancel(); - wait(); + try + { + wait(); + } + catch (const canceled_exception&) + { + } } } @@ -265,8 +278,14 @@ namespace fc { void cancel_and_wait() { - cancel(); - wait(); + cancel(); + try + { + wait(); + } + catch (const canceled_exception&) + { + } } /// @pre valid() diff --git a/include/fc/thread/task.hpp b/include/fc/thread/task.hpp index 5c1b30e..ac76cf8 100644 --- a/include/fc/thread/task.hpp +++ b/include/fc/thread/task.hpp @@ -10,7 +10,9 @@ namespace fc { class task_base : virtual public promise_base { public: - void run(); + void run(); + virtual void cancel() override; + protected: ~task_base(); /// Task priority looks like unsupported feature. @@ -61,7 +63,7 @@ namespace fc { class task : virtual public task_base, virtual public promise { public: template - task( Functor&& f ):task_base(&_functor) { + task( Functor&& f, const char* desc ):promise_base(desc), task_base(&_functor), promise(desc) { typedef typename fc::deduce::type FunctorType; static_assert( sizeof(f) <= sizeof(_functor), "sizeof(Functor) is larger than FunctorSize" ); new ((char*)&_functor) FunctorType( fc::forward(f) ); @@ -74,11 +76,12 @@ namespace fc { private: ~task(){} }; + template class task : virtual public task_base, virtual public promise { public: template - task( Functor&& f ):task_base(&_functor) { + task( Functor&& f, const char* desc ):promise_base(desc), task_base(&_functor), promise(desc) { typedef typename fc::deduce::type FunctorType; static_assert( sizeof(f) <= sizeof(_functor), "sizeof(Functor) is larger than FunctorSize" ); new ((char*)&_functor) FunctorType( fc::forward(f) ); @@ -93,4 +96,3 @@ namespace fc { }; } - diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index a133ba4..e6f01c8 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -52,13 +52,13 @@ namespace fc { * @param prio the priority relative to other tasks */ template - auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future { + auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { typedef decltype(f()) Result; typedef typename fc::deduce::type FunctorType; fc::task* tsk = - new fc::task( fc::forward(f) ); + new fc::task( fc::forward(f), desc ); fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - async_task(tsk,prio,desc); + async_task(tsk,prio); return r; } void poke(); @@ -75,12 +75,12 @@ namespace fc { */ template auto schedule( Functor&& f, const fc::time_point& when, - const char* desc = "", priority prio = priority()) -> fc::future { + const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { typedef decltype(f()) Result; fc::task* tsk = - new fc::task( fc::forward(f) ); + new fc::task( fc::forward(f), desc ); fc::future r(fc::shared_ptr< fc::promise >(tsk,true) ); - async_task(tsk,prio,when,desc); + async_task(tsk,prio,when); return r; } @@ -133,8 +133,8 @@ namespace fc { void exec(); int wait_any_until( std::vector&& v, const time_point& ); - void async_task( task_base* t, const priority& p, const char* desc ); - void async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ); + void async_task( task_base* t, const priority& p ); + void async_task( task_base* t, const priority& p, const time_point& tp ); class thread_d* my; }; @@ -172,11 +172,11 @@ namespace fc { int wait_any_until( std::vector&& v, const time_point& tp ); template - auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future { + auto async( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { return fc::thread::current().async( fc::forward(f), desc, prio ); } template - auto schedule( Functor&& f, const fc::time_point& t, const char* desc ="", priority prio = priority()) -> fc::future { + auto schedule( Functor&& f, const fc::time_point& t, const char* desc FC_TASK_NAME_DEFAULT_ARG, priority prio = priority()) -> fc::future { return fc::thread::current().schedule( fc::forward(f), t, desc, prio ); } diff --git a/include/fc/time.hpp b/include/fc/time.hpp index ae0d48d..22ace58 100644 --- a/include/fc/time.hpp +++ b/include/fc/time.hpp @@ -109,6 +109,9 @@ namespace fc { friend microseconds operator - ( const time_point_sec& t, const time_point_sec& m ) { return time_point(t) - time_point(m); } friend microseconds operator - ( const time_point& t, const time_point_sec& m ) { return time_point(t) - time_point(m); } + fc::string to_iso_string()const; + fc::string to_iso_extended_string()const; + private: uint32_t utc_seconds; }; diff --git a/include/fc/variant.hpp b/include/fc/variant.hpp index 848cb6a..973d762 100644 --- a/include/fc/variant.hpp +++ b/include/fc/variant.hpp @@ -31,14 +31,20 @@ namespace fc class time_point_sec; class microseconds; - void to_variant( const int16_t& var, variant& vo ); - void from_variant( const variant& var, int16_t& vo ); - void to_variant( const uint16_t& var, variant& vo ); - void from_variant( const variant& var, uint16_t& vo ); - void to_variant( const uint32_t& var, variant& vo ); - void from_variant( const variant& var, uint32_t& vo ); void to_variant( const uint8_t& var, variant& vo ); void from_variant( const variant& var, uint8_t& vo ); + void to_variant( const int8_t& var, variant& vo ); + void from_variant( const variant& var, int8_t& vo ); + + void to_variant( const uint16_t& var, variant& vo ); + void from_variant( const variant& var, uint16_t& vo ); + void to_variant( const int16_t& var, variant& vo ); + void from_variant( const variant& var, int16_t& vo ); + + void to_variant( const uint32_t& var, variant& vo ); + void from_variant( const variant& var, uint32_t& vo ); + void to_variant( const int32_t& var, variant& vo ); + void from_variant( const variant& var, int32_t& vo ); void to_variant( const variant_object& var, variant& vo ); void from_variant( const variant& var, variant_object& vo ); @@ -94,7 +100,6 @@ namespace fc template void from_variant( const variant& v, std::pair& p ); - /** * @brief stores null, int64, uint64, double, bool, string, std::vector, * and variant_object's. @@ -129,11 +134,15 @@ namespace fc variant( char* str ); variant( wchar_t* str ); variant( const wchar_t* str ); - variant( int val ); variant( float val ); + variant( uint8_t val ); + variant( int8_t val ); + variant( uint16_t val ); + variant( int16_t val ); variant( uint32_t val ); - variant( int64_t val ); + variant( int32_t val ); variant( uint64_t val ); + variant( int64_t val ); variant( double val ); variant( bool val ); variant( fc::string val ); diff --git a/src/asio.cpp b/src/asio.cpp index e618c2d..befa2ed 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -118,22 +118,32 @@ namespace fc { } namespace tcp { - std::vector resolve( const std::string& hostname, const std::string& port) { - resolver res( fc::asio::default_io_service() ); - promise >::ptr p( new promise >() ); - res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), - boost::bind( detail::resolve_handler, p, _1, _2 ) ); - return p->wait();; + std::vector resolve( const std::string& hostname, const std::string& port) + { + try + { + resolver res( fc::asio::default_io_service() ); + promise >::ptr p( new promise >("tcp::resolve completion") ); + res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port), + boost::bind( detail::resolve_handler, p, _1, _2 ) ); + return p->wait();; } + FC_RETHROW_EXCEPTIONS(warn, "") + } } namespace udp { - std::vector resolve( resolver& r, const std::string& hostname, const std::string& port) { - resolver res( fc::asio::default_io_service() ); - promise >::ptr p( new promise >() ); - res.async_resolve( resolver::query(hostname,port), - boost::bind( detail::resolve_handler, p, _1, _2 ) ); - return p->wait(); + std::vector resolve( resolver& r, const std::string& hostname, const std::string& port) + { + try + { + resolver res( fc::asio::default_io_service() ); + promise >::ptr p( new promise >("udp::resolve completion") ); + res.async_resolve( resolver::query(hostname,port), + boost::bind( detail::resolve_handler, p, _1, _2 ) ); + return p->wait(); } + FC_RETHROW_EXCEPTIONS(warn, "") + } } } } // namespace fc::asio diff --git a/src/crypto/openssl.cpp b/src/crypto/openssl.cpp index 36a3889..e3044ac 100644 --- a/src/crypto/openssl.cpp +++ b/src/crypto/openssl.cpp @@ -49,12 +49,6 @@ namespace fc int init_openssl() { - auto strAppDir = current_path(); - fc::path appDir(strAppDir); - fc::path openSSLConf = appDir / "openssl.cnf"; - if (fc::exists(openSSLConf)) - fc::store_configuration_path(openSSLConf); - static openssl_scope ossl; return 0; } diff --git a/src/io/iostream.cpp b/src/io/iostream.cpp index 5a01cc7..6eef009 100644 --- a/src/io/iostream.cpp +++ b/src/io/iostream.cpp @@ -17,7 +17,7 @@ namespace fc { struct cin_buffer { cin_buffer():eof(false),write_pos(0),read_pos(0),cinthread("cin"){ - cinthread.async( [=](){read();} ); + cinthread.async( [=](){read();}, "cin_buffer::read" ); } void read() { @@ -25,7 +25,7 @@ namespace fc { std::cin.read(&c,1); while( !std::cin.eof() ) { while( write_pos - read_pos > 0xfffff ) { - fc::promise::ptr wr( new fc::promise() ); + fc::promise::ptr wr( new fc::promise("cin_buffer::write_ready") ); write_ready = wr; if( write_pos - read_pos <= 0xfffff ) { wr->wait(); @@ -141,7 +141,7 @@ namespace fc { do { while( !b.eof && (b.write_pos - b.read_pos)==0 ){ // wait for more... - fc::promise::ptr rr( new fc::promise() ); + fc::promise::ptr rr( new fc::promise("cin_buffer::read_ready") ); { // copy read_ready because it is accessed from multiple threads fc::scoped_lock lock( b.read_ready_mutex ); b.read_ready = rr; diff --git a/src/log/file_appender.cpp b/src/log/file_appender.cpp index 686ed65..014fd09 100644 --- a/src/log/file_appender.cpp +++ b/src/log/file_appender.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -29,15 +28,14 @@ namespace fc { time_point_sec get_file_start_time( const time_point_sec& timestamp, const microseconds& interval ) { - const auto interval_seconds = interval.to_seconds(); - const auto file_number = timestamp.sec_since_epoch() / interval_seconds; - return time_point_sec( file_number * interval_seconds ); + int64_t interval_seconds = interval.to_seconds(); + int64_t file_number = timestamp.sec_since_epoch() / interval_seconds; + return time_point_sec( (uint32_t)(file_number * interval_seconds) ); } string timestamp_to_string( const time_point_sec& timestamp ) { - auto ptime = boost::posix_time::from_time_t( time_t ( timestamp.sec_since_epoch() ) ); - return boost::posix_time::to_iso_string( ptime ); + return timestamp.to_iso_string(); } time_point_sec string_to_timestamp( const string& str ) @@ -51,7 +49,7 @@ namespace fc { FC_ASSERT( _compression_thread ); if( !_compression_thread->is_current() ) { - _compression_thread->async( [this, filename]() { compress_file( filename ); } ).wait(); + _compression_thread->async( [this, filename]() { compress_file( filename ); }, "compress_file" ).wait(); return; } @@ -76,20 +74,28 @@ namespace fc { if( cfg.rotation_compression ) _compression_thread.reset( new thread( "compression") ); - _rotation_task = async( [this]() { rotate_files( true ); } ); + _rotation_task = async( [this]() { rotate_files( true ); }, "rotate_files(1)" ); } } ~impl() { - try - { - _rotation_task.cancel_and_wait(); - if( _compression_thread ) _compression_thread->quit(); - } - catch( ... ) - { - } + try + { + _rotation_task.cancel_and_wait(); + } + catch( ... ) + { + } + + try + { + if( _compression_thread ) + _compression_thread->quit(); + } + catch( ... ) + { + } } void rotate_files( bool initializing = false ) @@ -108,7 +114,7 @@ namespace fc { { if( start_time <= _current_file_start_time ) { - _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds() ); + _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds(), "rotate_files(2)" ); return; } @@ -154,7 +160,7 @@ namespace fc { } _current_file_start_time = start_time; - _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds() ); + _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds(), "rotate_files(3)" ); } }; file_appender::config::config( const fc::path& p ) diff --git a/src/network/http/http_server.cpp b/src/network/http/http_server.cpp index b9c7302..6560cd4 100644 --- a/src/network/http/http_server.cpp +++ b/src/network/http/http_server.cpp @@ -50,7 +50,7 @@ namespace fc { namespace http { impl(){} impl(const fc::ip::endpoint& p ) { tcp_serv.listen(p); - accept_complete = fc::async([this](){ this->accept_loop(); }); + accept_complete = fc::async([this](){ this->accept_loop(); }, "http_server accept_loop"); } fc::future accept_complete; ~impl() { @@ -66,7 +66,7 @@ namespace fc { namespace http { http::connection_ptr con = std::make_shared(); tcp_serv.accept( con->get_socket() ); //ilog( "Accept Connection" ); - fc::async( [=](){ handle_connection( con, on_req ); } ); + fc::async( [=](){ handle_connection( con, on_req ); }, "http_server handle_connection" ); } } @@ -144,7 +144,7 @@ namespace fc { namespace http { if( false || my->handle_next_req ) { ilog( "handle next request..." ); //fc::async( std::function(my->handle_next_req) ); - fc::async( my->handle_next_req ); + fc::async( my->handle_next_req, "http_server handle_next_req" ); } } } diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index c938edf..3cf4757 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -7,6 +7,7 @@ #include #include "../byteswap.hpp" +#include #include namespace fc @@ -16,22 +17,32 @@ namespace fc class ntp_impl { public: - ntp_impl():_request_interval_sec( 60*60 /* 1 hr */),_ntp_thread("ntp") - { - _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); - } - /** vector < host, port > */ std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; - fc::future _read_loop; + fc::future _read_loop_done; udp_socket _sock; uint32_t _request_interval_sec; fc::time_point _last_request_time; - optional _last_ntp_delta; + + std::atomic_bool _last_ntp_delta_initialized; + std::atomic _last_ntp_delta_microseconds; + fc::thread _ntp_thread; + fc::future _request_time_task_done; + + ntp_impl() : + _request_interval_sec( 60*60 /* 1 hr */), + _last_ntp_delta_microseconds(0), + _ntp_thread("ntp") + { + _last_ntp_delta_initialized = false; + _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); + } + void request_now() { + assert(_ntp_thread.is_current()); for( auto item : _ntp_hosts ) { try @@ -44,26 +55,33 @@ namespace fc std::array send_buf { {010,0,0,0,0,0,0,0,0} }; _last_request_time = fc::time_point::now(); _sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep ); + if (!_read_loop_done.valid() || _read_loop_done.ready()) + _read_loop_done = async( [this](){ read_loop(); }, "ntp_read_loop" ); break; } } // this could fail to resolve but we want to go on to other hosts.. catch ( const fc::exception& e ) { - elog( "${e}", ("e",e.to_detail_string() ) ); + elog( "${e}", ("e",e.to_detail_string() ) ); } } } // request_now - void request_time() + //started for first time in ntp() constructor, canceled in ~ntp() destructor + void request_time_task() { + assert(_ntp_thread.is_current()); request_now(); - _ntp_thread.schedule( [=](){ request_time(); }, fc::time_point::now() + fc::seconds(_request_interval_sec) ); + _request_time_task_done = schedule( [=](){ request_time_task(); }, + fc::time_point::now() + fc::seconds(_request_interval_sec), + "request_time_task" ); } // request_loop void read_loop() { - while( !_read_loop.canceled() ) + assert(_ntp_thread.is_current()); + while( !_read_loop_done.canceled() ) { fc::ip::endpoint from; std::array recv_buf; @@ -87,7 +105,8 @@ namespace fc if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) { - _last_ntp_delta = ntp_time - fc::time_point::now(); + _last_ntp_delta_microseconds = (ntp_time - fc::time_point::now()).count(); + _last_ntp_delta_initialized = true; } else elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); @@ -105,29 +124,51 @@ namespace fc :my( new detail::ntp_impl() ) { my->_sock.open(); - - my->_ntp_thread.async( [=](){ my->request_time(); } ); - my->_read_loop = my->_ntp_thread.async( [=](){ my->read_loop(); } ); + // if you start the read loop here, the recieve_from call will throw "invalid argument" on win32, + // so instead we trigger the read loop after making our first request + my->_request_time_task_done = my->_ntp_thread.async( [=](){ my->request_time_task(); }, "request_time_task" ); } ntp::~ntp() { - try { - my->_read_loop.cancel(); + my->_ntp_thread.async([=](){ + try + { + my->_request_time_task_done.cancel_and_wait(); + } + catch ( const fc::exception& e ) + { + wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring: ${e}", ("e",e) ); + } + catch (...) + { + wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring" ); + } + + try + { + my->_read_loop_done.cancel(); my->_sock.close(); - my->_read_loop.wait(); - } - catch ( const fc::exception& ) - { - // we expect canceled exceptions, but cannot throw - // from destructor - } + my->_read_loop_done.wait(); + } + catch ( const fc::canceled_exception& ) + { + } + catch ( const fc::exception& e ) + { + wlog( "Exception thrown while shutting down NTP's read_loop, ignoring: ${e}", ("e",e) ); + } + catch (...) + { + wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" ); + } + }, "ntp_shutdown_task").wait(); } void ntp::add_server( const std::string& hostname, uint16_t port) { - my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); + my->_ntp_thread.async( [=](){ my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); }, "add_server" ).wait(); } void ntp::set_request_interval( uint32_t interval_sec ) @@ -137,13 +178,13 @@ namespace fc void ntp::request_now() { - my->request_now(); + my->_ntp_thread.async( [=](){ my->request_now(); }, "request_now" ).wait(); } optional ntp::get_time()const { - if( my->_last_ntp_delta ) - return fc::time_point::now() + *my->_last_ntp_delta; + if( my->_last_ntp_delta_initialized ) + return fc::time_point::now() + fc::microseconds(my->_last_ntp_delta_microseconds); return optional(); } diff --git a/src/network/rate_limiting.cpp b/src/network/rate_limiting.cpp index fe69277..aac30ae 100644 --- a/src/network/rate_limiting.cpp +++ b/src/network/rate_limiting.cpp @@ -207,13 +207,13 @@ namespace fc size_t bytes_read; if (_download_bytes_per_second) { - promise::ptr completion_promise(new promise()); + promise::ptr completion_promise(new promise("rate_limiting_group_impl::readsome")); rate_limited_tcp_read_operation read_operation(socket, buffer, length, completion_promise); _read_operations_for_next_iteration.push_back(&read_operation); // launch the read processing loop it if isn't running, or signal it to resume if it's paused. if (!_process_pending_reads_loop_complete.valid() || _process_pending_reads_loop_complete.ready()) - _process_pending_reads_loop_complete = async([=](){ process_pending_reads(); }); + _process_pending_reads_loop_complete = async([=](){ process_pending_reads(); }, "process_pending_reads" ); else if (_new_read_operation_available_promise) _new_read_operation_available_promise->set_value(); @@ -232,13 +232,13 @@ namespace fc size_t bytes_written; if (_upload_bytes_per_second) { - promise::ptr completion_promise(new promise()); + promise::ptr completion_promise(new promise("rate_limiting_group_impl::writesome")); rate_limited_tcp_write_operation write_operation(socket, buffer, length, completion_promise); _write_operations_for_next_iteration.push_back(&write_operation); // launch the write processing loop it if isn't running, or signal it to resume if it's paused. if (!_process_pending_writes_loop_complete.valid() || _process_pending_writes_loop_complete.ready()) - _process_pending_writes_loop_complete = async([=](){ process_pending_writes(); }); + _process_pending_writes_loop_complete = async([=](){ process_pending_writes(); }, "process_pending_writes"); else if (_new_write_operation_available_promise) _new_write_operation_available_promise->set_value(); @@ -259,7 +259,7 @@ namespace fc process_pending_operations(_last_read_iteration_time, _download_bytes_per_second, _read_operations_in_progress, _read_operations_for_next_iteration, _read_tokens, _unused_read_tokens); - _new_read_operation_available_promise = new promise(); + _new_read_operation_available_promise = new promise("rate_limiting_group_impl::process_pending_reads"); try { if (_read_operations_in_progress.empty()) @@ -280,7 +280,7 @@ namespace fc process_pending_operations(_last_write_iteration_time, _upload_bytes_per_second, _write_operations_in_progress, _write_operations_for_next_iteration, _write_tokens, _unused_write_tokens); - _new_write_operation_available_promise = new promise(); + _new_write_operation_available_promise = new promise("rate_limiting_group_impl::process_pending_writes"); try { if (_write_operations_in_progress.empty()) diff --git a/src/network/udt_socket.cpp b/src/network/udt_socket.cpp index a065aac..0018238 100644 --- a/src/network/udt_socket.cpp +++ b/src/network/udt_socket.cpp @@ -31,7 +31,7 @@ namespace fc { UDT::startup(); check_udt_errors(); _epoll_id = UDT::epoll_create(); - _epoll_loop = _epoll_thread.async( [=](){ poll_loop(); } ); + _epoll_loop = _epoll_thread.async( [=](){ poll_loop(); }, "udt_poll_loop" ); } ~udt_epoll_service() @@ -194,7 +194,7 @@ namespace fc { connect_thread.async( [&](){ if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) ) check_udt_errors(); - }).wait(); + }, "udt_socket::connect_to").wait(); bool block = false; UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool)); diff --git a/src/rpc/json_connection.cpp b/src/rpc/json_connection.cpp index fa01fd0..06d3fa4 100644 --- a/src/rpc/json_connection.cpp +++ b/src/rpc/json_connection.cpp @@ -204,7 +204,7 @@ namespace fc { namespace rpc { variant v = json::from_stream(*_in); ///ilog( "input: ${in}", ("in", v ) ); //wlog( "recv: ${line}", ("line", line) ); - fc::async([=](){ handle_message(v.get_object()); }); + fc::async([=](){ handle_message(v.get_object()); }, "json_connection handle_message"); } } catch ( eof_exception& eof ) @@ -263,7 +263,7 @@ namespace fc { namespace rpc { { FC_THROW_EXCEPTION( assert_exception, "start should only be called once" ); } - return my->_done = fc::async( [=](){ my->read_loop(); } ); + return my->_done = fc::async( [=](){ my->read_loop(); }, "json_connection read_loop" ); } void json_connection::add_method( const fc::string& name, method m ) @@ -324,7 +324,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variants& args ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -350,7 +350,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -368,7 +368,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -388,7 +388,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -411,7 +411,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -436,7 +436,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -463,7 +463,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5, const variant& a6 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -491,7 +491,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3, const variant& a4, const variant& a5, const variant& a6, const variant& a7 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -525,7 +525,7 @@ namespace fc { namespace rpc { const variant& a4, const variant& a5, const variant& a6, const variant& a7, const variant& a8 ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); { fc::scoped_lock lock(my->_write_mutex); @@ -562,7 +562,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant_object& named_args ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); fc::scoped_lock lock(my->_write_mutex); { *my->_out << "{\"id\":"; @@ -579,7 +579,7 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method ) { auto id = my->_next_id++; - my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + my->_awaiting[id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); fc::scoped_lock lock(my->_write_mutex); { *my->_out << "{\"id\":"; diff --git a/src/thread/context.hpp b/src/thread/context.hpp index 35d83d7..4ab0dd4 100644 --- a/src/thread/context.hpp +++ b/src/thread/context.hpp @@ -52,7 +52,6 @@ namespace fc { cur_task(0) { #if BOOST_VERSION >= 105400 - bco::stack_context stack_ctx; size_t stack_size = bco::stack_allocator::default_stacksize() * 4; alloc.allocate(stack_ctx, stack_size); my_context = bc::make_fcontext( stack_ctx.sp, stack_ctx.size, sf); diff --git a/src/thread/task.cpp b/src/thread/task.cpp index c3e1feb..3188145 100644 --- a/src/thread/task.cpp +++ b/src/thread/task.cpp @@ -3,6 +3,7 @@ #include #include #include +#include "context.hpp" #include #include @@ -15,6 +16,7 @@ namespace fc { task_base::task_base(void* func) : + promise_base("task_base"), _posted_num(0), _active_context(nullptr), _next(nullptr), @@ -33,6 +35,7 @@ namespace fc { } #endif } + void task_base::run_impl() { try { if( !canceled() ) @@ -49,6 +52,16 @@ namespace fc { set_exception( std::make_shared( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) ); } } + + void task_base::cancel() + { + promise_base::cancel(); + if (_active_context) + { + _active_context->canceled = true; + } + } + task_base::~task_base() { _destroy_functor( _functor ); } diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 6968417..260907e 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -71,7 +71,7 @@ namespace fc { } thread::thread( const std::string& name ) { - promise::ptr p(new promise()); + promise::ptr p(new promise("thread start")); boost::thread* t = new boost::thread( [this,p,name]() { try { set_thread_name(name.c_str()); // set thread's name for the debugger to display @@ -129,7 +129,7 @@ namespace fc { //if quitting from a different thread, start quit task on thread. //If we have and know our attached boost thread, wait for it to finish, then return. if( ¤t() != this ) { - async( [=](){quit();} );//.wait(); + async( [=](){quit();}, "thread::quit" );//.wait(); if( my->boost_thread ) { auto n = name(); my->boost_thread->join(); @@ -139,7 +139,7 @@ namespace fc { return; } - wlog( "${s}", ("s",name()) ); + //wlog( "${s}", ("s",name()) ); // We are quiting from our own thread... // break all promises, thread quit! @@ -155,7 +155,7 @@ namespace fc { cur = n; } if( my->blocked ) { - wlog( "still blocking... whats up with that?"); + //wlog( "still blocking... whats up with that?"); debug( "on quit" ); } } @@ -277,8 +277,8 @@ namespace fc { return -1; } - void thread::async_task( task_base* t, const priority& p, const char* desc ) { - async_task( t, p, time_point::min(), desc ); + void thread::async_task( task_base* t, const priority& p ) { + async_task( t, p, time_point::min() ); } void thread::poke() { @@ -286,7 +286,7 @@ namespace fc { my->task_ready.notify_one(); } - void thread::async_task( task_base* t, const priority& p, const time_point& tp, const char* desc ) { + void thread::async_task( task_base* t, const priority& p, const time_point& tp ) { assert(my); t->_when = tp; // slog( "when %lld", t->_when.time_since_epoch().count() ); diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 314e0a4..f533dd7 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -487,7 +487,7 @@ namespace fc { void unblock( fc::context* c ) { if( fc::thread::current().my != this ) { - self.async( [=](){ unblock(c); } ); + self.async( [=](){ unblock(c); }, "thread_d::unblock" ); return; } if( c != current ) ready_push_front(c); diff --git a/src/time.cpp b/src/time.cpp index dc50544..06c4494 100644 --- a/src/time.cpp +++ b/src/time.cpp @@ -29,6 +29,19 @@ namespace fc { } FC_RETHROW_EXCEPTIONS(warn, "unable to convert ISO-formatted string to fc::time_point") } + + fc::string time_point_sec::to_iso_string()const + { + const auto ptime = boost::posix_time::from_time_t( time_t ( sec_since_epoch() ) ); + return boost::posix_time::to_iso_string( ptime ); + } + + fc::string time_point_sec::to_iso_extended_string()const + { + const auto ptime = boost::posix_time::from_time_t( time_t ( sec_since_epoch() ) ); + return boost::posix_time::to_iso_extended_string( ptime ); + } + void to_variant( const fc::time_point& t, variant& v ) { v = fc::string(t); } diff --git a/src/variant.cpp b/src/variant.cpp index 5ef005d..938c0c5 100644 --- a/src/variant.cpp +++ b/src/variant.cpp @@ -12,34 +12,6 @@ namespace fc { - - void to_variant( const uint16_t& var, variant& vo ) { vo = uint64_t(var); } - // TODO: warn on overflow? - void from_variant( const variant& var, uint16_t& vo ){ vo = static_cast(var.as_uint64()); } - - void to_variant( const int16_t& var, variant& vo ) { vo = int64_t(var); } - // TODO: warn on overflow? - void from_variant( const variant& var, int16_t& vo ){ vo = static_cast(var.as_int64()); } - -void to_variant( const std::vector& var, variant& vo ) -{ - if( var.size() ) - //vo = variant(base64_encode((unsigned char*)var.data(),var.size())); - vo = variant(to_hex(var.data(),var.size())); - else vo = ""; -} -void from_variant( const variant& var, std::vector& vo ) -{ - auto str = var.as_string(); - vo.resize( str.size() / 2 ); - if( vo.size() ) - { - size_t r = from_hex( str, vo.data(), vo.size() ); - FC_ASSERT( r = vo.size() ); - } -// std::string b64 = base64_decode( var.as_string() ); -// vo = std::vector( b64.c_str(), b64.c_str() + b64.size() ); -} /** * The TypeID is stored in the 'last byte' of the variant. */ @@ -59,35 +31,60 @@ variant::variant( fc::nullptr_t ) set_variant_type( this, null_type ); } -variant::variant( uint32_t val ) +variant::variant( uint8_t val ) +{ + *reinterpret_cast(this) = val; + set_variant_type( this, uint64_type ); +} + +variant::variant( int8_t val ) { *reinterpret_cast(this) = val; set_variant_type( this, int64_type ); } +variant::variant( uint16_t val ) +{ + *reinterpret_cast(this) = val; + set_variant_type( this, uint64_type ); +} + +variant::variant( int16_t val ) +{ + *reinterpret_cast(this) = val; + set_variant_type( this, int64_type ); +} + +variant::variant( uint32_t val ) +{ + *reinterpret_cast(this) = val; + set_variant_type( this, uint64_type ); +} + +variant::variant( int32_t val ) +{ + *reinterpret_cast(this) = val; + set_variant_type( this, int64_type ); +} + +variant::variant( uint64_t val ) +{ + *reinterpret_cast(this) = val; + set_variant_type( this, uint64_type ); +} + variant::variant( int64_t val ) { *reinterpret_cast(this) = val; set_variant_type( this, int64_type ); } -variant::variant( int val ) -{ - *reinterpret_cast(this) = val; - set_variant_type( this, int64_type ); -} variant::variant( float val ) { *reinterpret_cast(this) = val; set_variant_type( this, double_type ); } -variant::variant( uint64_t val ) -{ - *reinterpret_cast(this) = val; - set_variant_type( this, uint64_type ); -} - variant::variant( double val ) { *reinterpret_cast(this) = val; @@ -496,7 +493,6 @@ const string& variant::get_string()const FC_THROW_EXCEPTION( bad_cast_exception, "Invalid cast from type '${type}' to Object", ("type",get_type()) ); } - /// @throw if get_type() != object_type const variant_object& variant::get_object()const { @@ -505,27 +501,45 @@ const variant_object& variant::get_object()const FC_THROW_EXCEPTION( bad_cast_exception, "Invalid cast from type '${type}' to Object", ("type",get_type()) ); } -void to_variant( const std::string& s, variant& v ) +void from_variant( const variant& var, variants& vo ) { - v = variant( fc::string(s) ); + vo = var.get_array(); } //void from_variant( const variant& var, variant_object& vo ) //{ // vo = var.get_object(); //} -void from_variant( const variant& var, string& vo ) -{ - vo = var.as_string(); -} - -void from_variant( const variant& var, variants& vo ) -{ - vo = var.get_array(); -} void from_variant( const variant& var, variant& vo ) { vo = var; } +void to_variant( const uint8_t& var, variant& vo ) { vo = uint64_t(var); } +// TODO: warn on overflow? +void from_variant( const variant& var, uint8_t& vo ){ vo = static_cast(var.as_uint64()); } + +void to_variant( const int8_t& var, variant& vo ) { vo = int64_t(var); } +// TODO: warn on overflow? +void from_variant( const variant& var, int8_t& vo ){ vo = static_cast(var.as_int64()); } + +void to_variant( const uint16_t& var, variant& vo ) { vo = uint64_t(var); } +// TODO: warn on overflow? +void from_variant( const variant& var, uint16_t& vo ){ vo = static_cast(var.as_uint64()); } + +void to_variant( const int16_t& var, variant& vo ) { vo = int64_t(var); } +// TODO: warn on overflow? +void from_variant( const variant& var, int16_t& vo ){ vo = static_cast(var.as_int64()); } + +void to_variant( const uint32_t& var, variant& vo ) { vo = uint64_t(var); } +void from_variant( const variant& var, uint32_t& vo ) +{ + vo = static_cast(var.as_uint64()); +} + +void from_variant( const variant& var, int32_t& vo ) +{ + vo = static_cast(var.as_int64()); +} + void from_variant( const variant& var, int64_t& vo ) { vo = var.as_int64(); @@ -551,20 +565,34 @@ void from_variant( const variant& var, float& vo ) vo = static_cast(var.as_double()); } -void from_variant( const variant& var, int32_t& vo ) +void to_variant( const std::string& s, variant& v ) { - vo = static_cast(var.as_int64()); + v = variant( fc::string(s) ); } -void to_variant( const uint32_t& var, variant& vo ) { vo = uint64_t(var); } -void from_variant( const variant& var, uint32_t& vo ) +void from_variant( const variant& var, string& vo ) { - vo = static_cast(var.as_uint64()); + vo = var.as_string(); } -void to_variant( const uint8_t& var, variant& vo ) { vo = uint64_t(var); } -void from_variant( const variant& var, uint8_t& vo ) + +void to_variant( const std::vector& var, variant& vo ) { - vo = static_cast(var.as_uint64()); + if( var.size() ) + //vo = variant(base64_encode((unsigned char*)var.data(),var.size())); + vo = variant(to_hex(var.data(),var.size())); + else vo = ""; +} +void from_variant( const variant& var, std::vector& vo ) +{ + auto str = var.as_string(); + vo.resize( str.size() / 2 ); + if( vo.size() ) + { + size_t r = from_hex( str, vo.data(), vo.size() ); + FC_ASSERT( r = vo.size() ); + } +// std::string b64 = base64_decode( var.as_string() ); +// vo = std::vector( b64.c_str(), b64.c_str() + b64.size() ); } string format_string( const string& format, const variant_object& args ) diff --git a/tests/task_cancel.cpp b/tests/task_cancel.cpp index e63544c..c0c76f6 100644 --- a/tests/task_cancel.cpp +++ b/tests/task_cancel.cpp @@ -19,7 +19,7 @@ BOOST_AUTO_TEST_CASE( cancel_an_active_task ) { return sleep_aborted; } - }); + }, "test_task"); fc::time_point start_time = fc::time_point::now(); @@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE( cleanup_cancelled_task ) { BOOST_TEST_MESSAGE("Caught exception in async task, leaving the task's functor"); } - }); + }, "test_task"); std::weak_ptr weak_string_ptr(some_string); some_string.reset(); BOOST_CHECK_MESSAGE(!weak_string_ptr.expired(), "Weak pointer should still be valid because async task should be holding the strong pointer"); @@ -75,18 +75,30 @@ BOOST_AUTO_TEST_CASE( cleanup_cancelled_task ) BOOST_CHECK_MESSAGE(weak_string_ptr.expired(), "Weak pointer should now be invalid because async task should have been destroyed"); } +int task_execute_count = 0; +fc::future simple_task_done; +void simple_task() +{ + task_execute_count++; + simple_task_done = fc::schedule([](){ simple_task(); }, + fc::time_point::now() + fc::seconds(3), + "simple_task"); +} + BOOST_AUTO_TEST_CASE( cancel_scheduled_task ) { bool task_executed = false; try { - auto result = fc::schedule( [&]() { task_executed = true; }, fc::time_point::now() + fc::seconds(3) ); - result.cancel(); - result.wait(); + simple_task(); + simple_task(); + fc::usleep(fc::seconds(4)); + simple_task_done.cancel(); + simple_task_done.wait(); } catch ( const fc::exception& e ) { wlog( "${e}", ("e",e.to_detail_string() ) ); } - BOOST_CHECK(!task_executed); + BOOST_CHECK_EQUAL(task_execute_count, 2); } \ No newline at end of file