diff --git a/include/fc/process.hpp b/include/fc/process.hpp new file mode 100644 index 0000000..08c74ba --- /dev/null +++ b/include/fc/process.hpp @@ -0,0 +1,56 @@ +#pragma once +#include +#include + +namespace fc { + class istream; + class ostream; + class path; + class string; + template class vector; + + /** + * @brief start and manage an external process + * + * @note this class implements reference semantics. + */ + class process { + public: + enum exec_opts { + open_none = 0, + open_stdin = 0x01, + open_stdout = 0x02, + open_stderr = 0x04, + open_all = open_stdin|open_stdout|open_stderr, + }; + /** + * Return a new process executing the specified exe with the specified args. + */ + fc::future exec( const fc::path& exe, int opt = open_all ); + fc::future exec( const fc::path& exe, const fc::path& wd, int opt = open_all ); + fc::future exec( const fc::path& exe, fc::vector&& args , int opt = open_all ); + fc::future exec( const fc::path& exe, fc::vector&& args, const fc::path& wd, int opt = open_all ); + + /** + * Forcefully kills the process. + */ + void kill(); + + /** + * @brief returns a stream that writes to the process' stdin + */ + fc::ostream& in_stream(); + + /** + * @brief returns a stream that reads from the process' stdout + */ + fc::istream& out_stream(); + /** + * @brief returns a stream that reads from the process' stderr + */ + fc::istream& err_stream(); + + FC_REFERENCE_TYPE(process) + }; + +} // namespace fc diff --git a/src/process.cpp b/src/process.cpp new file mode 100644 index 0000000..705d343 --- /dev/null +++ b/src/process.cpp @@ -0,0 +1,185 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fc { + + namespace bp = boost::process; + namespace io = boost::iostreams; + + class process_sink : public io::sink { + public: + struct category : io::sink::category, io::flushable_tag {}; + typedef char type; + + process_sink( std::shared_ptr& p ):m_in(p){} + + std::streamsize write( const char* s, std::streamsize n ) { + if( !m_in ) return -1; + return static_cast(fc::asio::write( *m_in, + boost::asio::const_buffers_1( s, static_cast(n) ) )); + } + void close() { if(m_in) m_in->close(); } + bool flush() { return true; } + + private: + std::shared_ptr& m_in; + }; + + class process_source : public io::source { + public: + typedef char type; + + process_source( std::shared_ptr& pi ) + :m_pi(pi){} + + std::streamsize read( char* s, std::streamsize n ) { + if( !m_pi ) return -1; + try { + return static_cast(fc::asio::read_some( *m_pi, boost::asio::buffer( s, static_cast(n) ) )); + } catch ( const boost::system::system_error& e ) { + if( e.code() == boost::asio::error::eof ) + return -1; + throw; + } + } + private: + std::shared_ptr& m_pi; + }; +} // namespace fc + +FC_START_SHARED_IMPL( fc::process ) + impl() + :stat( fc::asio::default_io_service() ), + std_out(process_source(outp)), + std_err(process_source(errp)), + std_in(process_sink(inp)), + _ins(std_in), + _outs(std_out), + _errs(std_err){} + + std::shared_ptr child; + std::shared_ptr outp; + std::shared_ptr errp; + std::shared_ptr inp; + + bp::status stat; + bp::context pctx; + + // provide useful buffering + io::stream std_out; + io::stream std_err; + io::stream std_in; + + // adapt to ostream and istream interfaces + fc::ostream_wrapper _ins; + fc::istream_wrapper _outs; + fc::istream_wrapper _errs; +FC_END_SHARED_IMPL +#include + +namespace fc { + +FC_REFERENCE_TYPE_IMPL( process ) + + +fc::future process::exec( const fc::path& exe, fc::vector&& args, + const fc::path& work_dir, int opt ) { + + my->pctx.work_dir = work_dir.string(); + + if( opt&open_stdout) + my->pctx.streams[boost::process::stdout_id] = bp::behavior::async_pipe(); + else + my->pctx.streams[boost::process::stdout_id] = bp::behavior::null(); + + + if( opt& open_stderr ) + my->pctx.streams[boost::process::stderr_id] = bp::behavior::async_pipe(); + else + my->pctx.streams[boost::process::stderr_id] = bp::behavior::null(); + + if( opt& open_stdout ) + my->pctx.streams[boost::process::stdin_id] = bp::behavior::async_pipe(); + else + my->pctx.streams[boost::process::stdin_id] = bp::behavior::close(); + + std::vector a; + a.reserve(args.size()); + for( uint32_t i = 0; i < args.size(); ++i ) { + a.push_back( args[i] ); + } + my->child.reset( new bp::child( bp::create_child( exe.string(), fc::move(a), my->pctx ) ) ); + + if( opt & open_stdout ) { + bp::handle outh = my->child->get_handle( bp::stdout_id ); + my->outp.reset( new bp::pipe( fc::asio::default_io_service(), outh.release() ) ); + } + if( opt & open_stderr ) { + bp::handle errh = my->child->get_handle( bp::stderr_id ); + my->errp.reset( new bp::pipe( fc::asio::default_io_service(), errh.release() ) ); + } + if( opt & open_stdin ) { + bp::handle inh = my->child->get_handle( bp::stdin_id ); + my->inp.reset( new bp::pipe( fc::asio::default_io_service(), inh.release() ) ); + } + + + promise::ptr p(new promise("process")); + my->stat.async_wait( my->child->get_id(), [=]( const boost::system::error_code& ec, int exit_code ) + { + slog( "process::result %1%", exit_code ); + if( !ec ) { + #ifdef BOOST_POSIX_API + try { + if( WIFEXITED(exit_code) ) + p->set_value( WEXITSTATUS(exit_code) ); + else { + FC_THROW_MSG( "process exited with: %s ", strsignal(WTERMSIG(exit_code)) ); + } + } catch ( ... ) { + p->set_exception( fc::current_exception() ); + } + #else + p->set_value(exit_code); + #endif + } + else p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); + }); + return p; + +} + +/** + * Forcefully kills the process. + */ +void process::kill() { + my->child->terminate(); +} + +/** + * @brief returns a stream that writes to the process' stdin + */ +fc::ostream& process::in_stream() { + return my->_ins; +} + +/** + * @brief returns a stream that reads from the process' stdout + */ +fc::istream& process::out_stream() { + return my->_outs; +} +/** + * @brief returns a stream that reads from the process' stderr + */ +fc::istream& process::err_stream() { + return my->_errs; +} + +}