This commit is contained in:
Daniel Larimer 2012-11-24 19:39:19 -05:00
parent 55456d34ce
commit 7981c2fb45
9 changed files with 41 additions and 22 deletions

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <fc/iostream.hpp> #include <fc/iostream.hpp>
#include <fc/shared_ptr.hpp> #include <fc/shared_ptr.hpp>
#include <fc/log.hpp>
namespace fc { namespace fc {
/** /**
@ -19,6 +20,7 @@ namespace fc {
return *this; return *this;
} }
virtual void close() { virtual void close() {
// TODO: move to cpp
my->close(); my->close();
} }
virtual void flush() { virtual void flush() {
@ -100,7 +102,7 @@ namespace fc {
virtual void read( char* buf, size_t len ) { virtual void read( char* buf, size_t len ) {
st.read(buf,len); st.read(buf,len);
} }
virtual bool eof()const { return st.eof(); } virtual bool eof()const { return st.eof() || !st.good(); }
T& st; T& st;
}; };

View file

@ -23,7 +23,6 @@ namespace fc { namespace json {
} }
fc::future<int> exec( const fc::path& exe, fc::vector<fc::string>&& args, fc::future<int> exec( const fc::path& exe, fc::vector<fc::string>&& args,
const fc::path& wd, int opt = fc::process::open_all ) { const fc::path& wd, int opt = fc::process::open_all ) {
slog( "cd %s; %s", wd.generic_string().c_str(), exe.generic_string().c_str() );
auto r = _proc.exec( canonical(exe), fc::move(args), wd, opt ); auto r = _proc.exec( canonical(exe), fc::move(args), wd, opt );
_con.reset( new fc::json::rpc_stream_connection( _proc.out_stream(), _proc.in_stream() ) ); _con.reset( new fc::json::rpc_stream_connection( _proc.out_stream(), _proc.in_stream() ) );
this->_vtable.reset(new fc::detail::vtable<InterfaceType,fc::json::detail::rpc_member>() ); this->_vtable.reset(new fc::detail::vtable<InterfaceType,fc::json::detail::rpc_member>() );
@ -32,7 +31,7 @@ namespace fc { namespace json {
return r; return r;
} }
void kill() { _con->close(); _proc.kill(); } void kill() { _con->close(); _proc.kill(); }
/** /**
* @brief returns a stream that reads from the process' stderr * @brief returns a stream that reads from the process' stderr

View file

@ -1,16 +1,21 @@
#ifndef _FC_UDP_SOCKET_HPP_ #ifndef _FC_UDP_SOCKET_HPP_
#define _FC_UDP_SOCKET_HPP_ #define _FC_UDP_SOCKET_HPP_
#include <fc/utility.hpp> #include <fc/utility.hpp>
#include <fc/fwd.hpp> #include <fc/shared_ptr.hpp>
namespace fc { namespace fc {
namespace ip { namespace ip {
class endpoint; class endpoint;
} }
/**
* The udp_socket class has reference semantics, all copies will
* refer to the same underlying socket.
*/
class udp_socket { class udp_socket {
public: public:
udp_socket(); udp_socket();
udp_socket( const udp_socket& s );
~udp_socket(); ~udp_socket();
void open(); void open();
@ -24,8 +29,8 @@ namespace fc {
fc::ip::endpoint local_endpoint()const; fc::ip::endpoint local_endpoint()const;
private: private:
class impl; class impl;
fwd<impl,32> my; fc::shared_ptr<impl> my;
}; };
} }

View file

@ -14,7 +14,6 @@ namespace fc { namespace json {
impl( fc::istream& i, fc::ostream& o, rpc_stream_connection& s ) impl( fc::istream& i, fc::ostream& o, rpc_stream_connection& s )
:in(i),out(o),self(s){ :in(i),out(o),self(s){
slog( "%p", this );
_read_loop_complete = fc::async( [=](){ read_loop(); } ); _read_loop_complete = fc::async( [=](){ read_loop(); } );
} }
@ -28,20 +27,19 @@ namespace fc { namespace json {
fc::future<void> _read_loop_complete; fc::future<void> _read_loop_complete;
void read_loop() { void read_loop() {
slog( "%p", this );
fc::string line; fc::string line;
fc::getline( in, line ); fc::getline( in, line );
while( !in.eof() ) { while( !in.eof() ) {
try { try {
fc::value v= fc::json::from_string( line ); fc::value v= fc::json::from_string( line );
slog( "%s", fc::json::to_string(v).c_str() ); //slog( "%s", fc::json::to_string(v).c_str() );
self.handle_message(v); self.handle_message(v);
} catch (...) { } catch (...) {
wlog( "%s", fc::except_str().c_str() ); wlog( "%s", fc::except_str().c_str() );
return;
} }
fc::getline( in, line ); fc::getline( in, line );
} }
slog( "close read loop" );
self.cancel_pending_requests(); self.cancel_pending_requests();
if( !!on_close ) on_close(); if( !!on_close ) on_close();
} }
@ -50,7 +48,7 @@ namespace fc { namespace json {
rpc_stream_connection::rpc_stream_connection( fc::istream& i, fc::ostream& o ) rpc_stream_connection::rpc_stream_connection( fc::istream& i, fc::ostream& o )
:my( new impl(i,o,*this) ){ :my( new impl(i,o,*this) ){
} }
rpc_stream_connection::rpc_stream_connection(){ slog( "default" ); } rpc_stream_connection::rpc_stream_connection(){ }
rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){} rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){}
rpc_stream_connection::~rpc_stream_connection(){ rpc_stream_connection::~rpc_stream_connection(){
// slog( "%p", my.get() ); // slog( "%p", my.get() );
@ -60,7 +58,6 @@ namespace fc { namespace json {
// of this rpc_stream_connection // of this rpc_stream_connection
void rpc_stream_connection::open( fc::istream& i, fc::ostream& o) { void rpc_stream_connection::open( fc::istream& i, fc::ostream& o) {
my.reset( new impl(i,o,*this) ); my.reset( new impl(i,o,*this) );
slog( "open... %p", my.get() );
} }
// cancels all pending requests, closes the ostream // cancels all pending requests, closes the ostream

View file

@ -7,7 +7,7 @@ namespace fc {
class rpc_tcp_connection::impl : public fc::retainable { class rpc_tcp_connection::impl : public fc::retainable {
public: public:
tcp_socket sock; tcp_socket sock;
~impl(){ slog( "%p", this );} ~impl(){ }
}; };
rpc_tcp_connection::rpc_tcp_connection() rpc_tcp_connection::rpc_tcp_connection()
@ -22,15 +22,12 @@ namespace fc {
void rpc_tcp_connection::connect_to( const fc::ip::endpoint& ep ) { void rpc_tcp_connection::connect_to( const fc::ip::endpoint& ep ) {
my->sock.connect_to(ep); my->sock.connect_to(ep);
wlog( "Connected %p", my.get() );
open( my->sock, my->sock ); open( my->sock, my->sock );
} }
void rpc_tcp_connection::start() { void rpc_tcp_connection::start() {
slog( "open... %p", my.get() );
open( my->sock, my->sock ); open( my->sock, my->sock );
} }
void rpc_tcp_connection::close() { void rpc_tcp_connection::close() {
slog( "close %p", my.get() );
rpc_stream_connection::close(); rpc_stream_connection::close();
//my->sock.close(); //my->sock.close();
} }

View file

@ -22,8 +22,10 @@ namespace fc {
} }
void rpc_tcp_server::listen( uint16_t port ) { void rpc_tcp_server::listen( uint16_t port ) {
my->tcp_serv.listen(port); my->tcp_serv.listen(port);
fc::async([this](){ fc::async([this](){
try {
rpc_tcp_connection::ptr con(new rpc_tcp_connection() ); rpc_tcp_connection::ptr con(new rpc_tcp_connection() );
while( my->tcp_serv.accept( con->get_socket() ) ) { while( my->tcp_serv.accept( con->get_socket() ) ) {
slog( "new connection!" ); slog( "new connection!" );
@ -32,6 +34,9 @@ namespace fc {
my->cons.push_back(con); my->cons.push_back(con);
con.reset(new rpc_tcp_connection() ); con.reset(new rpc_tcp_connection() );
} }
} catch ( ... ) {
wlog( "tcp listen failed..." );
}
}); });
} }

View file

@ -43,7 +43,7 @@ namespace fc {
try { try {
return static_cast<std::streamsize>(fc::asio::read_some( *m_pi, boost::asio::buffer( s, static_cast<size_t>(n) ) )); return static_cast<std::streamsize>(fc::asio::read_some( *m_pi, boost::asio::buffer( s, static_cast<size_t>(n) ) ));
} catch ( const boost::system::system_error& e ) { } catch ( const boost::system::system_error& e ) {
if( e.code() == boost::asio::error::eof ) if( e.code() == boost::asio::error::eof )
return -1; return -1;
throw; throw;
} }
@ -63,6 +63,17 @@ FC_START_SHARED_IMPL( fc::process )
_outs(std_out), _outs(std_out),
_errs(std_err){} _errs(std_err){}
~impl() {
try {
if( inp ) {
inp->close();
}
child->terminate();
}catch(...) {
wlog( "caught exception cleaning up process" );
}
}
std::shared_ptr<bp::child> child; std::shared_ptr<bp::child> child;
std::shared_ptr<bp::pipe> outp; std::shared_ptr<bp::pipe> outp;
std::shared_ptr<bp::pipe> errp; std::shared_ptr<bp::pipe> errp;
@ -133,7 +144,7 @@ fc::future<int> process::exec( const fc::path& exe, fc::vector<fc::string>&& arg
promise<int>::ptr p(new promise<int>("process")); promise<int>::ptr p(new promise<int>("process"));
my->stat.async_wait( my->child->get_id(), [=]( const boost::system::error_code& ec, int exit_code ) my->stat.async_wait( my->child->get_id(), [=]( const boost::system::error_code& ec, int exit_code )
{ {
slog( "process::result %d", exit_code ); //slog( "process::result %d", exit_code );
if( !ec ) { if( !ec ) {
#ifdef BOOST_POSIX_API #ifdef BOOST_POSIX_API
try { try {
@ -159,7 +170,7 @@ fc::future<int> process::exec( const fc::path& exe, fc::vector<fc::string>&& arg
* Forcefully kills the process. * Forcefully kills the process.
*/ */
void process::kill() { void process::kill() {
my->child->terminate(); my->child->terminate();
} }
/** /**

View file

@ -22,7 +22,7 @@ namespace fc {
tcp_socket::tcp_socket(){} tcp_socket::tcp_socket(){}
tcp_socket::~tcp_socket(){ slog( "%p", &my); } tcp_socket::~tcp_socket(){ }
void tcp_socket::flush() {} void tcp_socket::flush() {}
void tcp_socket::close() { void tcp_socket::close() {

View file

@ -6,7 +6,7 @@
namespace fc { namespace fc {
class udp_socket::impl { class udp_socket::impl : public fc::retainable {
public: public:
impl():_sock( fc::asio::default_io_service() ){} impl():_sock( fc::asio::default_io_service() ){}
~impl(){ ~impl(){
@ -23,8 +23,11 @@ namespace fc {
return fc::ip::endpoint( e.address().to_v4().to_ulong(), e.port() ); return fc::ip::endpoint( e.address().to_v4().to_ulong(), e.port() );
} }
udp_socket::udp_socket() { udp_socket::udp_socket()
:my( new impl() ) {
} }
udp_socket::udp_socket( const udp_socket& s )
:my(s.my){}
udp_socket::~udp_socket() { udp_socket::~udp_socket() {
} }