Use a separate thread for log compression

This commit is contained in:
Vikram Rajkumar 2014-07-03 10:02:40 -04:00
parent adf8c10ee5
commit e9b1b8ec2e
5 changed files with 101 additions and 48 deletions

View file

@ -7,8 +7,6 @@
namespace fc {
class varaint;
class file_appender : public appender {
public:
struct config {

View file

@ -9,7 +9,7 @@ namespace fc {
class thread {
public:
thread( const char* name = "" );
thread( const std::string& name = "" );
thread( thread&& m );
thread& operator=(thread&& t );

View file

@ -1,9 +1,11 @@
#include <boost/filesystem/fstream.hpp>
#include <boost/iostreams/device/mapped_file.hpp>
#include <fc/compress/lzma.hpp>
#include <fc/exception/exception.hpp>
#include <fc/io/fstream.hpp>
#include <lzma_c.h>
#include <iostream>
namespace fc {
std::vector<char> lzma_compress(const std::vector<char>& in)
@ -91,8 +93,8 @@ static size_t lzma_file_output_callback( void* output_ctx, const void* output_bu
size_t dst_len = 0;
if( !exists( ctx->dst_path ) )
{
boost::filesystem::ofstream ofs( ctx->dst_path.string() );
ofs.close();
ofstream fs( ctx->dst_path );
fs.close();
}
else
{

View file

@ -1,21 +1,31 @@
#include <fc/compress/lzma.hpp>
#include <fc/exception/exception.hpp>
#include <fc/io/fstream.hpp>
#include <fc/log/file_appender.hpp>
#include <fc/reflect/variant.hpp>
#include <fc/thread/scoped_lock.hpp>
#include <fc/thread/thread.hpp>
#include <fc/variant.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/mutex.hpp>
#include <iomanip>
#include <queue>
#include <sstream>
namespace fc {
static const string compression_extension( ".lzma" );
class file_appender::impl : public fc::retainable {
public:
config cfg;
ofstream out;
boost::mutex slock;
time_point_sec current_file_start_time;
config cfg;
ofstream out;
boost::mutex slock;
private:
future<void> _rotation_task;
time_point_sec _current_file_start_time;
std::unique_ptr<thread> _compression_thread;
time_point_sec get_file_start_time( const time_point_sec& timestamp, const microseconds& interval )
{
@ -35,45 +45,96 @@ namespace fc {
return time_point::from_iso_string( str );
}
void compress_file( const string& filename )
{
FC_ASSERT( cfg.rotate && cfg.rotation_compression );
FC_ASSERT( _compression_thread );
if( !_compression_thread->is_current() )
{
_compression_thread->async( [this, filename]() { compress_file( filename ); } ).wait();
return;
}
try
{
lzma_compress_file( filename, filename + compression_extension );
remove_all( filename );
}
catch( ... )
{
}
}
public:
impl( const config& c) : cfg( c )
{
if( cfg.rotate )
{
if( cfg.rotation_compression )
_compression_thread.reset( new thread( "compression") );
_rotation_task = async( [this]() { rotate_files( true ); } );
}
}
~impl()
{
try
{
_rotation_task.cancel_and_wait();
if( _compression_thread ) _compression_thread->quit();
}
catch( ... )
{
}
}
void rotate_files( bool initializing = false )
{
if( !cfg.rotate ) return;
FC_ASSERT( cfg.rotate );
const auto now = time_point::now();
const auto start_time = get_file_start_time( now, cfg.rotation_interval );
const auto timestamp_string = timestamp_to_string( start_time );
const auto link_filename = cfg.filename.string();
const auto log_filename = link_filename + "." + timestamp_string;
if( !initializing )
{
if( start_time <= current_file_start_time ) return;
if( start_time <= _current_file_start_time ) return;
fc::scoped_lock<boost::mutex> lock( slock );
out.flush();
out.close();
}
auto log_filename = cfg.filename.string();
const auto link_filename = log_filename;
out.open( log_filename.c_str() );
remove_all( link_filename );
const auto timestamp_string = timestamp_to_string( start_time );
create_hard_link( log_filename, link_filename );
/* Delete old log files */
const auto limit_time = now - cfg.rotation_limit;
auto itr = directory_iterator( fc::path( log_filename ).parent_path() );
auto itr = directory_iterator( fc::path( link_filename ).parent_path() );
for( ; itr != directory_iterator(); itr++ )
{
const auto current_filename = itr->string();
auto current_pos = current_filename.find( log_filename );
if( current_pos != 0 ) continue;
current_pos = log_filename.size() + 1;
const auto current_timestamp = string( current_filename.begin() + current_pos,
current_filename.begin() + current_pos + timestamp_string.size() );
try
{
if( string_to_timestamp( current_timestamp ) < limit_time )
const auto current_filename = itr->string();
auto current_pos = current_filename.find( link_filename );
if( current_pos != 0 ) continue;
current_pos = link_filename.size() + 1;
const auto current_timestamp_str = string( current_filename.begin() + current_pos, /* substr not working */
current_filename.begin() + current_pos + timestamp_string.size() );
const auto current_timestamp = string_to_timestamp( current_timestamp_str );
if( current_timestamp < start_time )
{
remove_all( current_filename );
}
else if( cfg.rotation_compression )
{
if( current_filename.substr( current_filename.size() - 3 ) == ".7z" ) continue;
/* TODO: Possibly move compression to another thread */
lzma_compress_file( current_filename, current_filename + ".7z" );
remove_all( current_filename );
if( current_timestamp < limit_time || file_size( current_filename ) <= 0 )
{
remove_all( current_filename );
continue;
}
if( !cfg.rotation_compression ) continue;
if( current_filename.find( compression_extension ) != string::npos ) continue;
compress_file( current_filename );
}
}
catch( ... )
@ -81,34 +142,29 @@ namespace fc {
}
}
current_file_start_time = start_time;
log_filename += "." + timestamp_string;
out.open( log_filename.c_str() );
create_hard_link( log_filename, link_filename );
_current_file_start_time = start_time;
_rotation_task = schedule( [this]() { rotate_files(); }, now + cfg.rotation_interval );
}
};
file_appender::config::config( const fc::path& p )
:format( "${timestamp} ${thread_name} ${context} ${file}:${line} ${method} ${level}] ${message}" ),
filename(p),flush(true),truncate(true),rotate(false),rotation_compression(false){}
filename(p),flush(true),truncate(true),rotate(false),rotation_compression(true){}
file_appender::file_appender( const variant& args )
:my( new impl() )
:my( new impl( args.as<config>() ) )
{
std::string log_filename;
try
{
my->cfg = args.as<config>();
log_filename = my->cfg.filename.string();
fc::create_directories( fc::path( log_filename ).parent_path() );
if( !my->cfg.rotate ) my->out.open( log_filename.c_str() );
else my->rotate_files( true );
}
catch( ... )
{
std::cerr << "error opening log file: " << log_filename << "\n";
//elog( "%s", fc::except_str().c_str() );
}
}
file_appender::~file_appender(){}
@ -142,12 +198,11 @@ namespace fc {
// fc::string fmt_str = fc::format_string( my->cfg.format, mutable_variant_object(m.get_context())( "message", message) );
/* Write to log file (rotating file beforehand if necessary) */
{
fc::scoped_lock<boost::mutex> lock( my->slock );
if( my->cfg.rotate ) my->rotate_files();
my->out << line.str() << "\t\t\t" << m.get_context().get_file() <<":"<<m.get_context().get_line_number()<<"\n";
if( my->cfg.flush ) my->out.flush();
}
}
}
} // fc

View file

@ -70,11 +70,11 @@ namespace fc {
return t;
}
thread::thread( const char* name ) {
thread::thread( const std::string& name ) {
promise<void>::ptr p(new promise<void>());
boost::thread* t = new boost::thread( [this,p,name]() {
try {
set_thread_name(name); // set thread's name for the debugger to display
set_thread_name(name.c_str()); // set thread's name for the debugger to display
this->my = new thread_d(*this);
current_thread() = this;
p->set_value();
@ -126,17 +126,15 @@ namespace fc {
void thread::debug( const fc::string& d ) { /*my->debug(d);*/ }
void thread::quit() {
//if quiting from a different thread, start quit task on thread.
//if quitting from a different thread, start quit task on thread.
//If we have and know our attached boost thread, wait for it to finish, then return.
if( &current() != this ) {
async( [=](){quit();} );//.wait();
if( my->boost_thread ) {
auto n = name();
ilog( "joining... ${n}", ("n",n) );//n.c_str() );
my->boost_thread->join();
delete my;
my = nullptr;
ilog( "done joining...${n}", ("n",n) ); //n.c_str() );
}
return;
}