From 7981c2fb450d3f5f27e2a06beac64885b87cedf5 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Sat, 24 Nov 2012 19:39:19 -0500 Subject: [PATCH] fix bugs --- include/fc/iostream_wrapper.hpp | 4 +++- include/fc/json_rpc_process_client.hpp | 3 +-- include/fc/udp_socket.hpp | 11 ++++++++--- src/json_rpc_stream_connection.cpp | 9 +++------ src/json_rpc_tcp_connection.cpp | 5 +---- src/json_rpc_tcp_server.cpp | 5 +++++ src/process.cpp | 17 ++++++++++++++--- src/tcp_socket.cpp | 2 +- src/udp_socket.cpp | 7 +++++-- 9 files changed, 41 insertions(+), 22 deletions(-) diff --git a/include/fc/iostream_wrapper.hpp b/include/fc/iostream_wrapper.hpp index 3b5fcdb..ed3affa 100644 --- a/include/fc/iostream_wrapper.hpp +++ b/include/fc/iostream_wrapper.hpp @@ -1,6 +1,7 @@ #pragma once #include #include +#include namespace fc { /** @@ -19,6 +20,7 @@ namespace fc { return *this; } virtual void close() { + // TODO: move to cpp my->close(); } virtual void flush() { @@ -100,7 +102,7 @@ namespace fc { virtual void read( char* buf, size_t len ) { st.read(buf,len); } - virtual bool eof()const { return st.eof(); } + virtual bool eof()const { return st.eof() || !st.good(); } T& st; }; diff --git a/include/fc/json_rpc_process_client.hpp b/include/fc/json_rpc_process_client.hpp index 88c7214..d2fe2fe 100644 --- a/include/fc/json_rpc_process_client.hpp +++ b/include/fc/json_rpc_process_client.hpp @@ -23,7 +23,6 @@ namespace fc { namespace json { } fc::future exec( const fc::path& exe, fc::vector&& args, const fc::path& wd, int opt = fc::process::open_all ) { - slog( "cd %s; %s", wd.generic_string().c_str(), exe.generic_string().c_str() ); auto r = _proc.exec( canonical(exe), fc::move(args), wd, opt ); _con.reset( new fc::json::rpc_stream_connection( _proc.out_stream(), _proc.in_stream() ) ); this->_vtable.reset(new fc::detail::vtable() ); @@ -32,7 +31,7 @@ namespace fc { namespace json { return r; } - void kill() { _con->close(); _proc.kill(); } + void kill() { _con->close(); _proc.kill(); } /** * @brief returns a stream that reads from the process' stderr diff --git a/include/fc/udp_socket.hpp b/include/fc/udp_socket.hpp index 101c381..d7b5a25 100644 --- a/include/fc/udp_socket.hpp +++ b/include/fc/udp_socket.hpp @@ -1,16 +1,21 @@ #ifndef _FC_UDP_SOCKET_HPP_ #define _FC_UDP_SOCKET_HPP_ #include -#include +#include namespace fc { namespace ip { class endpoint; } + /** + * The udp_socket class has reference semantics, all copies will + * refer to the same underlying socket. + */ class udp_socket { public: udp_socket(); + udp_socket( const udp_socket& s ); ~udp_socket(); void open(); @@ -24,8 +29,8 @@ namespace fc { fc::ip::endpoint local_endpoint()const; private: - class impl; - fwd my; + class impl; + fc::shared_ptr my; }; } diff --git a/src/json_rpc_stream_connection.cpp b/src/json_rpc_stream_connection.cpp index 7cb0531..ecf4f27 100644 --- a/src/json_rpc_stream_connection.cpp +++ b/src/json_rpc_stream_connection.cpp @@ -14,7 +14,6 @@ namespace fc { namespace json { impl( fc::istream& i, fc::ostream& o, rpc_stream_connection& s ) :in(i),out(o),self(s){ - slog( "%p", this ); _read_loop_complete = fc::async( [=](){ read_loop(); } ); } @@ -28,20 +27,19 @@ namespace fc { namespace json { fc::future _read_loop_complete; void read_loop() { - slog( "%p", this ); fc::string line; fc::getline( in, line ); while( !in.eof() ) { try { fc::value v= fc::json::from_string( line ); - slog( "%s", fc::json::to_string(v).c_str() ); + //slog( "%s", fc::json::to_string(v).c_str() ); self.handle_message(v); } catch (...) { wlog( "%s", fc::except_str().c_str() ); + return; } fc::getline( in, line ); } - slog( "close read loop" ); self.cancel_pending_requests(); if( !!on_close ) on_close(); } @@ -50,7 +48,7 @@ namespace fc { namespace json { rpc_stream_connection::rpc_stream_connection( fc::istream& i, fc::ostream& o ) :my( new impl(i,o,*this) ){ } - rpc_stream_connection::rpc_stream_connection(){ slog( "default" ); } + rpc_stream_connection::rpc_stream_connection(){ } rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){} rpc_stream_connection::~rpc_stream_connection(){ // slog( "%p", my.get() ); @@ -60,7 +58,6 @@ namespace fc { namespace json { // of this rpc_stream_connection void rpc_stream_connection::open( fc::istream& i, fc::ostream& o) { my.reset( new impl(i,o,*this) ); - slog( "open... %p", my.get() ); } // cancels all pending requests, closes the ostream diff --git a/src/json_rpc_tcp_connection.cpp b/src/json_rpc_tcp_connection.cpp index 845c6be..52dfee9 100644 --- a/src/json_rpc_tcp_connection.cpp +++ b/src/json_rpc_tcp_connection.cpp @@ -7,7 +7,7 @@ namespace fc { class rpc_tcp_connection::impl : public fc::retainable { public: tcp_socket sock; - ~impl(){ slog( "%p", this );} + ~impl(){ } }; rpc_tcp_connection::rpc_tcp_connection() @@ -22,15 +22,12 @@ namespace fc { void rpc_tcp_connection::connect_to( const fc::ip::endpoint& ep ) { my->sock.connect_to(ep); - wlog( "Connected %p", my.get() ); open( my->sock, my->sock ); } void rpc_tcp_connection::start() { - slog( "open... %p", my.get() ); open( my->sock, my->sock ); } void rpc_tcp_connection::close() { - slog( "close %p", my.get() ); rpc_stream_connection::close(); //my->sock.close(); } diff --git a/src/json_rpc_tcp_server.cpp b/src/json_rpc_tcp_server.cpp index 01f85b5..73c9d0c 100644 --- a/src/json_rpc_tcp_server.cpp +++ b/src/json_rpc_tcp_server.cpp @@ -22,8 +22,10 @@ namespace fc { } void rpc_tcp_server::listen( uint16_t port ) { + my->tcp_serv.listen(port); fc::async([this](){ + try { rpc_tcp_connection::ptr con(new rpc_tcp_connection() ); while( my->tcp_serv.accept( con->get_socket() ) ) { slog( "new connection!" ); @@ -32,6 +34,9 @@ namespace fc { my->cons.push_back(con); con.reset(new rpc_tcp_connection() ); } + } catch ( ... ) { + wlog( "tcp listen failed..." ); + } }); } diff --git a/src/process.cpp b/src/process.cpp index 9773edd..333e9ad 100644 --- a/src/process.cpp +++ b/src/process.cpp @@ -43,7 +43,7 @@ namespace fc { try { return static_cast(fc::asio::read_some( *m_pi, boost::asio::buffer( s, static_cast(n) ) )); } catch ( const boost::system::system_error& e ) { - if( e.code() == boost::asio::error::eof ) + if( e.code() == boost::asio::error::eof ) return -1; throw; } @@ -63,6 +63,17 @@ FC_START_SHARED_IMPL( fc::process ) _outs(std_out), _errs(std_err){} + ~impl() { + try { + if( inp ) { + inp->close(); + } + child->terminate(); + }catch(...) { + wlog( "caught exception cleaning up process" ); + } + } + std::shared_ptr child; std::shared_ptr outp; std::shared_ptr errp; @@ -133,7 +144,7 @@ fc::future process::exec( const fc::path& exe, fc::vector&& arg promise::ptr p(new promise("process")); my->stat.async_wait( my->child->get_id(), [=]( const boost::system::error_code& ec, int exit_code ) { - slog( "process::result %d", exit_code ); + //slog( "process::result %d", exit_code ); if( !ec ) { #ifdef BOOST_POSIX_API try { @@ -159,7 +170,7 @@ fc::future process::exec( const fc::path& exe, fc::vector&& arg * Forcefully kills the process. */ void process::kill() { - my->child->terminate(); + my->child->terminate(); } /** diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index 3613e7b..4df563c 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -22,7 +22,7 @@ namespace fc { tcp_socket::tcp_socket(){} - tcp_socket::~tcp_socket(){ slog( "%p", &my); } + tcp_socket::~tcp_socket(){ } void tcp_socket::flush() {} void tcp_socket::close() { diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index 530590d..814f5f2 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -6,7 +6,7 @@ namespace fc { - class udp_socket::impl { + class udp_socket::impl : public fc::retainable { public: impl():_sock( fc::asio::default_io_service() ){} ~impl(){ @@ -23,8 +23,11 @@ namespace fc { return fc::ip::endpoint( e.address().to_v4().to_ulong(), e.port() ); } - udp_socket::udp_socket() { + udp_socket::udp_socket() + :my( new impl() ) { } + udp_socket::udp_socket( const udp_socket& s ) + :my(s.my){} udp_socket::~udp_socket() { }