From eed62c83381955328e5fe2ef4a8ba7a75e810115 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Thu, 17 Jul 2014 17:03:25 -0400 Subject: [PATCH] sockets now wait for any pending write or read operations to finish before destructing --- include/fc/asio.hpp | 12 ++++++------ include/fc/thread/future.hpp | 4 ++-- src/network/tcp_socket.cpp | 8 ++++++-- src/thread/thread.cpp | 8 ++++---- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index 8723290..ef8f6b9 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -79,11 +79,11 @@ namespace asio { * @return the number of bytes read. */ template - size_t read_some(AsyncReadStream& s, const MutableBufferSequence& buf) + future read_some(AsyncReadStream& s, const MutableBufferSequence& buf) { promise::ptr p(new promise("fc::asio::async_read_some")); s.async_read_some(buf, boost::bind(detail::read_write_handler, p, _1, _2)); - return p->wait(); + return p;//->wait(); } template @@ -117,10 +117,10 @@ namespace asio { * @return the number of bytes written */ template - size_t write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { + future write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::write_some")); s.async_write_some( buf, boost::bind( detail::read_write_handler, p, _1, _2 ) ); - return p->wait(); + return p; //->wait(); } /** @@ -183,7 +183,7 @@ namespace asio { virtual size_t readsome( char* buf, size_t len ) { - auto r = fc::asio::read_some(*_stream, boost::asio::buffer(buf, len) ); + auto r = fc::asio::read_some(*_stream, boost::asio::buffer(buf, len) ).wait(); return r; } @@ -200,7 +200,7 @@ namespace asio { virtual size_t writesome( const char* buf, size_t len ) { - return fc::asio::write_some(*_stream, boost::asio::const_buffers_1(buf, len) ); + return fc::asio::write_some(*_stream, boost::asio::const_buffers_1(buf, len) ).wait(); } virtual void close(){ _stream->close(); } diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index 8ed6830..86e8a96 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -204,7 +204,7 @@ namespace fc { bool error()const { return m_prom->error(); } void cancel()const { if( m_prom ) m_prom->cancel(); } - bool canceled()const { return m_prom->canceled(); } + bool canceled()const { if( m_prom ) return m_prom->canceled(); else return true;} void cancel_and_wait() { @@ -261,7 +261,7 @@ namespace fc { } bool valid()const { return !!m_prom; } - bool canceled()const { return m_prom->canceled(); } + bool canceled()const { return m_prom ? m_prom->canceled() : true; } void cancel_and_wait() { diff --git a/src/network/tcp_socket.cpp b/src/network/tcp_socket.cpp index 2a53c14..184faaa 100644 --- a/src/network/tcp_socket.cpp +++ b/src/network/tcp_socket.cpp @@ -23,21 +23,25 @@ namespace fc { { if( _sock.is_open() ) _sock.close(); + if( _read_in_progress.valid() ) try { _read_in_progress.wait(); } catch ( ... ) {} + if( _write_in_progress.valid() ) try { _write_in_progress.wait(); } catch ( ... ) {} } virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; + fc::future _write_in_progress; + fc::future _read_in_progress; boost::asio::ip::tcp::socket _sock; tcp_socket_io_hooks* _io_hooks; }; size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) { - return fc::asio::read_some(socket, boost::asio::buffer(buffer, length)); + return (_read_in_progress = fc::asio::read_some(socket, boost::asio::buffer(buffer, length))).wait(); } size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) { - return fc::asio::write_some(socket, boost::asio::buffer(buffer, length)); + return (_write_in_progress = fc::asio::write_some(socket, boost::asio::buffer(buffer, length))).wait(); } diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 17016fa..6968417 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -108,7 +108,7 @@ namespace fc { } thread::~thread() { - //slog( "my %p", my ); + //wlog( "my ${n}", ("n",name()) ); if( is_current() ) { wlog( "delete my" ); @@ -139,7 +139,7 @@ namespace fc { return; } - // wlog( "%s", my->name.c_str() ); + wlog( "${s}", ("s",name()) ); // We are quiting from our own thread... // break all promises, thread quit! @@ -155,8 +155,8 @@ namespace fc { cur = n; } if( my->blocked ) { - // wlog( "still blocking... whats up with that?"); - // debug( "on quit" ); + wlog( "still blocking... whats up with that?"); + debug( "on quit" ); } } BOOST_ASSERT( my->blocked == 0 );