From e9b1b8ec2eb55aa920466d4774bf9c2619b3c556 Mon Sep 17 00:00:00 2001 From: Vikram Rajkumar Date: Thu, 3 Jul 2014 10:02:40 -0400 Subject: [PATCH] Use a separate thread for log compression --- include/fc/log/file_appender.hpp | 2 - include/fc/thread/thread.hpp | 2 +- src/compress/lzma.cpp | 8 +- src/log/file_appender.cpp | 129 ++++++++++++++++++++++--------- src/thread/thread.cpp | 8 +- 5 files changed, 101 insertions(+), 48 deletions(-) diff --git a/include/fc/log/file_appender.hpp b/include/fc/log/file_appender.hpp index 5963692..af7aa62 100644 --- a/include/fc/log/file_appender.hpp +++ b/include/fc/log/file_appender.hpp @@ -7,8 +7,6 @@ namespace fc { -class varaint; - class file_appender : public appender { public: struct config { diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index e0863ad..a133ba4 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -9,7 +9,7 @@ namespace fc { class thread { public: - thread( const char* name = "" ); + thread( const std::string& name = "" ); thread( thread&& m ); thread& operator=(thread&& t ); diff --git a/src/compress/lzma.cpp b/src/compress/lzma.cpp index d69cc86..fb669d0 100644 --- a/src/compress/lzma.cpp +++ b/src/compress/lzma.cpp @@ -1,9 +1,11 @@ -#include #include #include #include +#include #include +#include + namespace fc { std::vector lzma_compress(const std::vector& in) @@ -91,8 +93,8 @@ static size_t lzma_file_output_callback( void* output_ctx, const void* output_bu size_t dst_len = 0; if( !exists( ctx->dst_path ) ) { - boost::filesystem::ofstream ofs( ctx->dst_path.string() ); - ofs.close(); + ofstream fs( ctx->dst_path ); + fs.close(); } else { diff --git a/src/log/file_appender.cpp b/src/log/file_appender.cpp index 203ecb1..e012b5a 100644 --- a/src/log/file_appender.cpp +++ b/src/log/file_appender.cpp @@ -1,21 +1,31 @@ #include +#include #include #include #include #include +#include #include #include #include #include +#include #include namespace fc { + + static const string compression_extension( ".lzma" ); + class file_appender::impl : public fc::retainable { public: - config cfg; - ofstream out; - boost::mutex slock; - time_point_sec current_file_start_time; + config cfg; + ofstream out; + boost::mutex slock; + + private: + future _rotation_task; + time_point_sec _current_file_start_time; + std::unique_ptr _compression_thread; time_point_sec get_file_start_time( const time_point_sec& timestamp, const microseconds& interval ) { @@ -35,45 +45,96 @@ namespace fc { return time_point::from_iso_string( str ); } + void compress_file( const string& filename ) + { + FC_ASSERT( cfg.rotate && cfg.rotation_compression ); + FC_ASSERT( _compression_thread ); + if( !_compression_thread->is_current() ) + { + _compression_thread->async( [this, filename]() { compress_file( filename ); } ).wait(); + return; + } + + try + { + lzma_compress_file( filename, filename + compression_extension ); + remove_all( filename ); + } + catch( ... ) + { + } + } + + public: + impl( const config& c) : cfg( c ) + { + if( cfg.rotate ) + { + if( cfg.rotation_compression ) + _compression_thread.reset( new thread( "compression") ); + + _rotation_task = async( [this]() { rotate_files( true ); } ); + } + } + + ~impl() + { + try + { + _rotation_task.cancel_and_wait(); + if( _compression_thread ) _compression_thread->quit(); + } + catch( ... ) + { + } + } + void rotate_files( bool initializing = false ) { - if( !cfg.rotate ) return; + FC_ASSERT( cfg.rotate ); const auto now = time_point::now(); const auto start_time = get_file_start_time( now, cfg.rotation_interval ); + const auto timestamp_string = timestamp_to_string( start_time ); + const auto link_filename = cfg.filename.string(); + const auto log_filename = link_filename + "." + timestamp_string; + if( !initializing ) { - if( start_time <= current_file_start_time ) return; + if( start_time <= _current_file_start_time ) return; + fc::scoped_lock lock( slock ); + out.flush(); out.close(); } - auto log_filename = cfg.filename.string(); - const auto link_filename = log_filename; + out.open( log_filename.c_str() ); remove_all( link_filename ); - const auto timestamp_string = timestamp_to_string( start_time ); + create_hard_link( log_filename, link_filename ); /* Delete old log files */ const auto limit_time = now - cfg.rotation_limit; - auto itr = directory_iterator( fc::path( log_filename ).parent_path() ); + auto itr = directory_iterator( fc::path( link_filename ).parent_path() ); for( ; itr != directory_iterator(); itr++ ) { - const auto current_filename = itr->string(); - auto current_pos = current_filename.find( log_filename ); - if( current_pos != 0 ) continue; - current_pos = log_filename.size() + 1; - const auto current_timestamp = string( current_filename.begin() + current_pos, - current_filename.begin() + current_pos + timestamp_string.size() ); try { - if( string_to_timestamp( current_timestamp ) < limit_time ) + const auto current_filename = itr->string(); + auto current_pos = current_filename.find( link_filename ); + if( current_pos != 0 ) continue; + current_pos = link_filename.size() + 1; + const auto current_timestamp_str = string( current_filename.begin() + current_pos, /* substr not working */ + current_filename.begin() + current_pos + timestamp_string.size() ); + const auto current_timestamp = string_to_timestamp( current_timestamp_str ); + if( current_timestamp < start_time ) { - remove_all( current_filename ); - } - else if( cfg.rotation_compression ) - { - if( current_filename.substr( current_filename.size() - 3 ) == ".7z" ) continue; - /* TODO: Possibly move compression to another thread */ - lzma_compress_file( current_filename, current_filename + ".7z" ); - remove_all( current_filename ); + if( current_timestamp < limit_time || file_size( current_filename ) <= 0 ) + { + remove_all( current_filename ); + continue; + } + + if( !cfg.rotation_compression ) continue; + if( current_filename.find( compression_extension ) != string::npos ) continue; + compress_file( current_filename ); } } catch( ... ) @@ -81,34 +142,29 @@ namespace fc { } } - current_file_start_time = start_time; - log_filename += "." + timestamp_string; - out.open( log_filename.c_str() ); - create_hard_link( log_filename, link_filename ); + _current_file_start_time = start_time; + _rotation_task = schedule( [this]() { rotate_files(); }, now + cfg.rotation_interval ); } }; file_appender::config::config( const fc::path& p ) :format( "${timestamp} ${thread_name} ${context} ${file}:${line} ${method} ${level}] ${message}" ), - filename(p),flush(true),truncate(true),rotate(false),rotation_compression(false){} + filename(p),flush(true),truncate(true),rotate(false),rotation_compression(true){} file_appender::file_appender( const variant& args ) - :my( new impl() ) + :my( new impl( args.as() ) ) { std::string log_filename; try { - my->cfg = args.as(); log_filename = my->cfg.filename.string(); fc::create_directories( fc::path( log_filename ).parent_path() ); if( !my->cfg.rotate ) my->out.open( log_filename.c_str() ); - else my->rotate_files( true ); } catch( ... ) { std::cerr << "error opening log file: " << log_filename << "\n"; - //elog( "%s", fc::except_str().c_str() ); } } file_appender::~file_appender(){} @@ -142,12 +198,11 @@ namespace fc { // fc::string fmt_str = fc::format_string( my->cfg.format, mutable_variant_object(m.get_context())( "message", message) ); - /* Write to log file (rotating file beforehand if necessary) */ { fc::scoped_lock lock( my->slock ); - if( my->cfg.rotate ) my->rotate_files(); my->out << line.str() << "\t\t\t" << m.get_context().get_file() <<":"<cfg.flush ) my->out.flush(); } } -} + +} // fc diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 4d856fa..17016fa 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -70,11 +70,11 @@ namespace fc { return t; } - thread::thread( const char* name ) { + thread::thread( const std::string& name ) { promise::ptr p(new promise()); boost::thread* t = new boost::thread( [this,p,name]() { try { - set_thread_name(name); // set thread's name for the debugger to display + set_thread_name(name.c_str()); // set thread's name for the debugger to display this->my = new thread_d(*this); current_thread() = this; p->set_value(); @@ -126,17 +126,15 @@ namespace fc { void thread::debug( const fc::string& d ) { /*my->debug(d);*/ } void thread::quit() { - //if quiting from a different thread, start quit task on thread. + //if quitting from a different thread, start quit task on thread. //If we have and know our attached boost thread, wait for it to finish, then return. if( ¤t() != this ) { async( [=](){quit();} );//.wait(); if( my->boost_thread ) { auto n = name(); - ilog( "joining... ${n}", ("n",n) );//n.c_str() ); my->boost_thread->join(); delete my; my = nullptr; - ilog( "done joining...${n}", ("n",n) ); //n.c_str() ); } return; }