From 38f97ef61a8abff9c6077b26f9cefc3e54c14aba Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Fri, 11 Jan 2013 09:12:53 -0500 Subject: [PATCH] fix json rpc server memory leak --- include/fc/asio.hpp | 4 +--- include/fc/endpoint.hpp | 2 +- include/fc/tcp_socket.hpp | 4 +--- src/json_rpc_stream_connection.cpp | 10 ++++++++-- src/json_rpc_tcp_server.cpp | 13 +++++++++++++ src/tcp_socket.cpp | 10 ++++++---- vendor/CMakeLists.txt | 2 +- 7 files changed, 31 insertions(+), 14 deletions(-) diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index b2356a5..c63067e 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -2,8 +2,7 @@ * @file fc/cmt/asio.hpp * @brief defines wrappers for boost::asio functions */ -#ifndef _FC_ASIO_HPP_ -#define _FC_ASIO_HPP_ +#pragma once #include #include #include @@ -262,4 +261,3 @@ namespace asio { } } // namespace fc::asio -#endif // _BOOST_CMT_ASIO_HPP_ diff --git a/include/fc/endpoint.hpp b/include/fc/endpoint.hpp index 286592e..f1805f3 100644 --- a/include/fc/endpoint.hpp +++ b/include/fc/endpoint.hpp @@ -15,7 +15,7 @@ namespace fc { uint32_t ip()const { return _ip; } -d + private: uint32_t _ip; }; diff --git a/include/fc/tcp_socket.hpp b/include/fc/tcp_socket.hpp index 408fb82..e2eb354 100644 --- a/include/fc/tcp_socket.hpp +++ b/include/fc/tcp_socket.hpp @@ -1,5 +1,4 @@ -#ifndef _FC_TCP_SOCKET_HPP_ -#define _FC_TCP_SOCKET_HPP_ +#pragma once #include #include #include @@ -55,4 +54,3 @@ namespace fc { } // namesapce fc -#endif // _FC_TCP_SOCKET_HPP_ diff --git a/src/json_rpc_stream_connection.cpp b/src/json_rpc_stream_connection.cpp index 505047f..285c787 100644 --- a/src/json_rpc_stream_connection.cpp +++ b/src/json_rpc_stream_connection.cpp @@ -22,9 +22,12 @@ namespace fc { namespace json { ~impl() { try { + // slog( "..." ); self.cancel_pending_requests(); _read_loop_complete.cancel(); + // slog( "wait..." ); _read_loop_complete.wait(); + // slog( "DONE ..." ); } catch ( ... ) {} } @@ -48,7 +51,9 @@ namespace fc { namespace json { } catch ( ... ) { wlog( "%s", fc::except_str().c_str() ); } + // slog( "cancel..."); self.cancel_pending_requests(); + // slog( "close!" ); if( !!on_close ) on_close(); } }; @@ -56,9 +61,10 @@ namespace fc { namespace json { rpc_stream_connection::rpc_stream_connection( fc::istream& i, fc::ostream& o ) :my( new impl(i,o,*this) ){ } - rpc_stream_connection::rpc_stream_connection(){ slog( "%p...",this); } - rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){ slog( "%p",this); } + rpc_stream_connection::rpc_stream_connection(){ } + rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){ } rpc_stream_connection::~rpc_stream_connection(){ + wlog( "%p", this ); } // the life of the streams must exceed the life of all copies diff --git a/src/json_rpc_tcp_server.cpp b/src/json_rpc_tcp_server.cpp index 73c9d0c..f5e6d39 100644 --- a/src/json_rpc_tcp_server.cpp +++ b/src/json_rpc_tcp_server.cpp @@ -31,7 +31,20 @@ namespace fc { slog( "new connection!" ); my->on_con( *con ); con->start(); + rpc_tcp_connection* tcpc = con.get(); my->cons.push_back(con); + con->on_close( [=]() { + for( int i = 0; i < my->cons.size(); ++i ) { + if( my->cons[i].get() == tcpc ) { + fc_swap( my->cons[i], my->cons.back() ); + auto tmp = my->cons.back(); + my->cons.pop_back(); + fc::async([tmp](){slog("free con");}); + // TODO: swap to end, pop back + return; + } + } + }); con.reset(new rpc_tcp_connection() ); } } catch ( ... ) { diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index c94f31e..6b0a440 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -21,13 +21,13 @@ namespace fc { return my->_sock.is_open(); } - tcp_socket::tcp_socket(){} + tcp_socket::tcp_socket(){}; - tcp_socket::~tcp_socket(){ } + tcp_socket::~tcp_socket(){}; void tcp_socket::flush() {} void tcp_socket::close() { - my->_sock.close(); + if( is_open() ) my->_sock.close(); } bool tcp_socket::eof()const { @@ -63,14 +63,16 @@ namespace fc { boost::system::error_code ec; size_t w = my->_sock.read_some( boost::asio::buffer( buf, len ), ec ); if( ec == boost::asio::error::would_block ) { - promise::ptr p(new promise("tcp_socket::write")); + promise::ptr p(new promise("tcp_socket::readsome")); my->_sock.async_read_some( boost::asio::buffer(buf, len), [=]( const boost::system::error_code& ec, size_t bt ) { + slog( "%d ec: %s", bt, boost::system::system_error(ec).what() ); if( !ec ) p->set_value(bt); else p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) ); }); return p->wait(); } else if (ec ) { + slog( "throw!" ); throw boost::system::system_error(ec); } return w; diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index b12310e..16ac56b 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -1,3 +1,3 @@ add_subdirectory( libssh2-1.4.2 ) #add_subdirectory( zlib-1.2.7) -add_subdirectory( sigar ) +#add_subdirectory( sigar )