fix json rpc server memory leak

This commit is contained in:
Daniel Larimer 2013-01-11 09:12:53 -05:00
parent a5a88a50c7
commit 38f97ef61a
7 changed files with 31 additions and 14 deletions

View file

@ -2,8 +2,7 @@
* @file fc/cmt/asio.hpp
* @brief defines wrappers for boost::asio functions
*/
#ifndef _FC_ASIO_HPP_
#define _FC_ASIO_HPP_
#pragma once
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <fc/future.hpp>
@ -262,4 +261,3 @@ namespace asio {
} } // namespace fc::asio
#endif // _BOOST_CMT_ASIO_HPP_

View file

@ -15,7 +15,7 @@ namespace fc {
uint32_t ip()const { return _ip; }
d
private:
uint32_t _ip;
};

View file

@ -1,5 +1,4 @@
#ifndef _FC_TCP_SOCKET_HPP_
#define _FC_TCP_SOCKET_HPP_
#pragma once
#include <fc/utility.hpp>
#include <fc/fwd.hpp>
#include <fc/iostream.hpp>
@ -55,4 +54,3 @@ namespace fc {
} // namesapce fc
#endif // _FC_TCP_SOCKET_HPP_

View file

@ -22,9 +22,12 @@ namespace fc { namespace json {
~impl() {
try {
// slog( "..." );
self.cancel_pending_requests();
_read_loop_complete.cancel();
// slog( "wait..." );
_read_loop_complete.wait();
// slog( "DONE ..." );
} catch ( ... ) {}
}
@ -48,7 +51,9 @@ namespace fc { namespace json {
} catch ( ... ) {
wlog( "%s", fc::except_str().c_str() );
}
// slog( "cancel...");
self.cancel_pending_requests();
// slog( "close!" );
if( !!on_close ) on_close();
}
};
@ -56,9 +61,10 @@ namespace fc { namespace json {
rpc_stream_connection::rpc_stream_connection( fc::istream& i, fc::ostream& o )
:my( new impl(i,o,*this) ){
}
rpc_stream_connection::rpc_stream_connection(){ slog( "%p...",this); }
rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){ slog( "%p",this); }
rpc_stream_connection::rpc_stream_connection(){ }
rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){ }
rpc_stream_connection::~rpc_stream_connection(){
wlog( "%p", this );
}
// the life of the streams must exceed the life of all copies

View file

@ -31,7 +31,20 @@ namespace fc {
slog( "new connection!" );
my->on_con( *con );
con->start();
rpc_tcp_connection* tcpc = con.get();
my->cons.push_back(con);
con->on_close( [=]() {
for( int i = 0; i < my->cons.size(); ++i ) {
if( my->cons[i].get() == tcpc ) {
fc_swap( my->cons[i], my->cons.back() );
auto tmp = my->cons.back();
my->cons.pop_back();
fc::async([tmp](){slog("free con");});
// TODO: swap to end, pop back
return;
}
}
});
con.reset(new rpc_tcp_connection() );
}
} catch ( ... ) {

View file

@ -21,13 +21,13 @@ namespace fc {
return my->_sock.is_open();
}
tcp_socket::tcp_socket(){}
tcp_socket::tcp_socket(){};
tcp_socket::~tcp_socket(){ }
tcp_socket::~tcp_socket(){};
void tcp_socket::flush() {}
void tcp_socket::close() {
my->_sock.close();
if( is_open() ) my->_sock.close();
}
bool tcp_socket::eof()const {
@ -63,14 +63,16 @@ namespace fc {
boost::system::error_code ec;
size_t w = my->_sock.read_some( boost::asio::buffer( buf, len ), ec );
if( ec == boost::asio::error::would_block ) {
promise<size_t>::ptr p(new promise<size_t>("tcp_socket::write"));
promise<size_t>::ptr p(new promise<size_t>("tcp_socket::readsome"));
my->_sock.async_read_some( boost::asio::buffer(buf, len),
[=]( const boost::system::error_code& ec, size_t bt ) {
slog( "%d ec: %s", bt, boost::system::system_error(ec).what() );
if( !ec ) p->set_value(bt);
else p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) );
});
return p->wait();
} else if (ec ) {
slog( "throw!" );
throw boost::system::system_error(ec);
}
return w;

View file

@ -1,3 +1,3 @@
add_subdirectory( libssh2-1.4.2 )
#add_subdirectory( zlib-1.2.7)
add_subdirectory( sigar )
#add_subdirectory( sigar )