From 39436c70218ded1c6d95d9c7e6214c0b976f8fb8 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Thu, 8 Nov 2012 22:02:07 -0500 Subject: [PATCH] ssh remote process exec --- include/fc/ssh/client.hpp | 3 +- include/fc/ssh/process.hpp | 8 +- src/ssh.cpp | 249 +++++++++++++++++++++++++++++++++++-- tests/ssh.cpp | 17 ++- 4 files changed, 261 insertions(+), 16 deletions(-) diff --git a/include/fc/ssh/client.hpp b/include/fc/ssh/client.hpp index 21c03d6..d0fd10f 100644 --- a/include/fc/ssh/client.hpp +++ b/include/fc/ssh/client.hpp @@ -8,6 +8,7 @@ namespace fc { namespace ssh { namespace detail { struct client_impl; + struct process_impl; }; enum sftp_file_type { @@ -109,7 +110,7 @@ namespace fc { private: friend class process; - friend class process_impl; + friend class detail::process_impl; fc::shared_ptr my; }; diff --git a/include/fc/ssh/process.hpp b/include/fc/ssh/process.hpp index 759a024..cc4dbbe 100644 --- a/include/fc/ssh/process.hpp +++ b/include/fc/ssh/process.hpp @@ -22,9 +22,12 @@ namespace fc { * * Process can only be created by mace::ssh::client. */ - class process : public fc::retainable { + class process { //: public fc::retainable { public: - typedef fc::shared_ptr ptr; + //typedef fc::shared_ptr ptr; + + process( const process& p ); + process( process&& p ); ~process(); @@ -47,6 +50,7 @@ namespace fc { private: friend class client; process( client& c, const fc::string& cmd, const fc::string& pty_type = fc::string() ); + process(); fc::shared_ptr my; }; diff --git a/src/ssh.cpp b/src/ssh.cpp index 8881eb0..c3a9976 100644 --- a/src/ssh.cpp +++ b/src/ssh.cpp @@ -20,30 +20,50 @@ namespace fc { namespace ssh { namespace detail { static int ssh_init = libssh2_init(0); + class process_impl; class process_istream : public fc::istream { public: - virtual size_t readsome( char* buf, size_t len ) { return 0; } - virtual istream& read( char* buf, size_t len ) { return *this; } + process_istream( process_impl& p, int c ) + :proc(p),chan(c){} - virtual bool eof()const { return true; } + virtual size_t readsome( char* buf, size_t len ); + virtual istream& read( char* buf, size_t len ); + + virtual bool eof()const; + + process_impl& proc; + int chan; }; class process_ostream : public fc::ostream { public: - virtual ostream& write( const char* buf, size_t len ) { return *this; } - virtual void close(){} - virtual void flush(){} + process_ostream( process_impl& p ) + :proc(p){} + + virtual ostream& write( const char* buf, size_t len ); + virtual void close(); + virtual void flush(); + + process_impl& proc; }; class process_impl : public fc::retainable { public: - process_impl( const client& c ):sshc(c){} + process_impl( const client& c, const fc::string& cmd, const fc::string& pty_type ); + fc::string command; + fc::promise::ptr result; + LIBSSH2_CHANNEL* chan; + int read_some( char* data, size_t len, int stream_id ); + int write_some( const char* data, size_t len, int stream_id ); + void flush(); + void send_eof(); + + client sshc; process_istream std_err; process_istream std_out; process_ostream std_in; - client sshc; }; @@ -68,6 +88,7 @@ namespace fc { namespace ssh { fc::promise::ptr read_prom; fc::promise::ptr write_prom; + LIBSSH2_CHANNEL* open_channel( const fc::string& pty_type ); static void kbd_callback(const char *name, int name_len, const char *instruction, int instruction_len, int num_prompts, const LIBSSH2_USERAUTH_KBDINT_PROMPT *prompts, @@ -647,7 +668,219 @@ namespace fc { namespace ssh { fc::istream& process::err_stream() { return my->std_err; } + + process::process( const process& p ) + :my(p.my){ } + process::process( process&& p ) + :my(fc::move(p.my)){ } + process::process( client& c, const fc::string& cmd, const fc::string& pty_type) + :my( new detail::process_impl( c, cmd, pty_type ) ) { } + + + void detail::process_impl::flush() { + if( !chan ) return; + int ec = libssh2_channel_flush_ex( chan, LIBSSH2_CHANNEL_FLUSH_EXTENDED_DATA); + while( ec == LIBSSH2_ERROR_EAGAIN ) { + sshc.my->wait_on_socket(); + ec = libssh2_channel_flush_ex( chan, LIBSSH2_CHANNEL_FLUSH_EXTENDED_DATA ); + } + if( ec < 0 ) { + FC_THROW_MSG( "flush failed: channel error %d", ec ); + } + } + int detail::process_impl::read_some( char* data, size_t len, int stream_id ){ + if( !sshc.my->session ) { FC_THROW_MSG( "Session closed" ); } + + int rc; + char* buf = data; + size_t buflen = len; + do { + rc = libssh2_channel_read_ex( chan, stream_id, buf, buflen ); + if( rc > 0 ) { + buf += rc; + buflen -= rc; + return buf-data; + } else if( rc == 0 ) { + if( libssh2_channel_eof( chan ) ) { + return -1; // eof + } + sshc.my->wait_on_socket(); + } else { + if( rc == LIBSSH2_ERROR_EAGAIN ) { + if( 0 < (buf-data) ) { + return buf-data; + } + else { + sshc.my->wait_on_socket(); + rc = 0; + continue; + } + } else { + char* msg; + if( !sshc.my || !sshc.my->session ) { FC_THROW_MSG( "Session closed" ); } + rc = libssh2_session_last_error( sshc.my->session, &msg, 0, 0 ); + FC_THROW_MSG( "read failed: %s - %s", rc, msg ); return buf-data; + } + } + } while( rc >= 0 && buflen); + return buf-data; + } + int detail::process_impl::write_some( const char* data, size_t len, int stream_id ) { + if( !sshc.my->session ) { FC_THROW_MSG( "Session closed" ); } + + int rc; + const char* buf = data; + size_t buflen = len; + do { + rc = libssh2_channel_write_ex( chan, stream_id, buf, buflen ); + if( rc > 0 ) { + buf += rc; + buflen -= rc; + return buf-data; + } else if( rc == 0 ) { + if( libssh2_channel_eof( chan ) ) { + elog( "return %1%", -1 ); + FC_THROW_MSG( "EOF" ); + //return -1; // eof + } + } else { + + if( rc == LIBSSH2_ERROR_EAGAIN ) { + if( 0 < (buf-data) ) { + return buf-data; + } + else { + sshc.my->wait_on_socket(); + rc = 0; + continue; + } + } else { + char* msg; + rc = libssh2_session_last_error( sshc.my->session, &msg, 0, 0 ); + FC_THROW_MSG( "write failed: %s - %s", rc, msg ); + return buf-data; + } + } + } while( rc >= 0 && buflen); + return buf-data; + } + void detail::process_impl::send_eof() { + if( sshc.my->session ) { + int ec = libssh2_channel_send_eof( chan ); + while( ec == LIBSSH2_ERROR_EAGAIN ) { + sshc.my->wait_on_socket(); + ec = libssh2_channel_send_eof( chan ); + } + if( ec ) { + char* msg = 0; + ec = libssh2_session_last_error( sshc.my->session, &msg, 0, 0 ); + FC_THROW_MSG( "send eof failed: %s - %s", ec, msg ); + } + } + } + + size_t detail::process_istream::readsome( char* buf, size_t len ) { + return proc.read_some( buf, len, chan ); + } + istream& detail::process_istream::read( char* buf, size_t len ) { + size_t r = 0; + do { + r += proc.read_some( buf + r, len - r, chan ); + } while( r < len ); + + return *this; + } + + bool detail::process_istream::eof()const { + return libssh2_channel_eof( proc.chan ); + } + + ostream& detail::process_ostream::write( const char* buf, size_t len ) { + size_t wrote = 0; + do { + wrote += proc.write_some( buf+wrote, len-wrote, 0 ); + } while( wrote < len ); + return *this; + } + void detail::process_ostream::close(){ + proc.send_eof(); + } + void detail::process_ostream::flush(){ + proc.flush(); + } + detail::process_impl::process_impl( const client& c, const fc::string& cmd, const fc::string& pty_type ) + :sshc(c),std_err(*this,SSH_EXTENDED_DATA_STDERR),std_out(*this,0),std_in(*this) + { + chan = c.my->open_channel(pty_type); + + /* + unsigned int rw_size = 0; + int ec = libssh2_channel_receive_window_adjust2(chan, 1024*64, 0, &rw_size ); + while( ec == LIBSSH2_ERROR_EAGAIN ) { + sshc->my->wait_on_socket(); + ec = libssh2_channel_receive_window_adjust2(chan, 1024*64, 0, &rw_size ); + } + elog( "rwindow size %1%", rw_size ); + */ + + + int ec = libssh2_channel_handle_extended_data2(chan, LIBSSH2_CHANNEL_EXTENDED_DATA_NORMAL ); + while( ec == LIBSSH2_ERROR_EAGAIN ) { + sshc.my->wait_on_socket(); + ec = libssh2_channel_handle_extended_data2(chan, LIBSSH2_CHANNEL_EXTENDED_DATA_NORMAL ); + } + + if( cmd.size() == 0 ) { + ec = libssh2_channel_shell(chan ); + while( ec == LIBSSH2_ERROR_EAGAIN ) { + sshc.my->wait_on_socket(); + ec = libssh2_channel_shell(chan); + } + } else { + ec = libssh2_channel_exec( chan, cmd.c_str() ); + while( ec == LIBSSH2_ERROR_EAGAIN ) { + sshc.my->wait_on_socket(); + ec = libssh2_channel_exec( chan, cmd.c_str() ); + } + } + if( ec ) { + char* msg = 0; + ec = libssh2_session_last_error( sshc.my->session, &msg, 0, 0 ); + FC_THROW_MSG( "libssh2_channel_exec failed: %s - %s", ec, msg ); + } + } + LIBSSH2_CHANNEL* detail::client_impl::open_channel( const fc::string& pty_type ) { + LIBSSH2_CHANNEL* chan = 0; + chan = libssh2_channel_open_session(session); + if( !chan ) { + char* msg; + int ec = libssh2_session_last_error( session, &msg, 0, 0 ); + while( !chan && ec == LIBSSH2_ERROR_EAGAIN ) { + wait_on_socket(); + chan = libssh2_channel_open_session(session); + ec = libssh2_session_last_error( session, &msg, 0, 0 ); + } + if( !chan ) { + FC_THROW_MSG( "libssh2_channel_open_session failed: %s - %s", ec, msg ); + } + } + + if( pty_type.size() ) { + int ec = libssh2_channel_request_pty(chan,pty_type.c_str()); + while( ec == LIBSSH2_ERROR_EAGAIN ) { + wait_on_socket(); + ec = libssh2_channel_request_pty(chan,pty_type.c_str()); + } + if( 0 != ec ) { + char* msg; + ec = libssh2_session_last_error( session, &msg, 0, 0 ); + FC_THROW_MSG( "libssh2_channel_req_pty failed: %s - %s", ec, msg ); + } + } + return chan; + } + } } diff --git a/tests/ssh.cpp b/tests/ssh.cpp index f434c0f..e02ae39 100644 --- a/tests/ssh.cpp +++ b/tests/ssh.cpp @@ -6,16 +6,23 @@ int main( int argc, char** argv ) { try { - if( argc < 3 ) { - fc::cout<>pw; fc::ssh::client c; c.connect( "dlarimer", pw, "localhost" ); - c.scp_send( argv[1], argv[2] ); + fc::ssh::process proc = c.exec( "/bin/ls" ); + while( !proc.out_stream().eof() ) { + fc::string line; + fc::getline( proc.out_stream(), line ); + fc::cout<