sockets now wait for any pending write or read operations to finish before destructing

This commit is contained in:
Daniel Larimer 2014-07-17 17:03:25 -04:00
parent 01202aa709
commit eed62c8338
4 changed files with 18 additions and 14 deletions

View file

@ -79,11 +79,11 @@ namespace asio {
* @return the number of bytes read.
*/
template<typename AsyncReadStream, typename MutableBufferSequence>
size_t read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
future<size_t> read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
{
promise<size_t>::ptr p(new promise<size_t>("fc::asio::async_read_some"));
s.async_read_some(buf, boost::bind(detail::read_write_handler, p, _1, _2));
return p->wait();
return p;//->wait();
}
template<typename AsyncReadStream, typename MutableBufferSequence>
@ -117,10 +117,10 @@ namespace asio {
* @return the number of bytes written
*/
template<typename AsyncWriteStream, typename ConstBufferSequence>
size_t write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
future<size_t> write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
promise<size_t>::ptr p(new promise<size_t>("fc::asio::write_some"));
s.async_write_some( buf, boost::bind( detail::read_write_handler, p, _1, _2 ) );
return p->wait();
return p; //->wait();
}
/**
@ -183,7 +183,7 @@ namespace asio {
virtual size_t readsome( char* buf, size_t len )
{
auto r = fc::asio::read_some(*_stream, boost::asio::buffer(buf, len) );
auto r = fc::asio::read_some(*_stream, boost::asio::buffer(buf, len) ).wait();
return r;
}
@ -200,7 +200,7 @@ namespace asio {
virtual size_t writesome( const char* buf, size_t len )
{
return fc::asio::write_some(*_stream, boost::asio::const_buffers_1(buf, len) );
return fc::asio::write_some(*_stream, boost::asio::const_buffers_1(buf, len) ).wait();
}
virtual void close(){ _stream->close(); }

View file

@ -204,7 +204,7 @@ namespace fc {
bool error()const { return m_prom->error(); }
void cancel()const { if( m_prom ) m_prom->cancel(); }
bool canceled()const { return m_prom->canceled(); }
bool canceled()const { if( m_prom ) return m_prom->canceled(); else return true;}
void cancel_and_wait()
{
@ -261,7 +261,7 @@ namespace fc {
}
bool valid()const { return !!m_prom; }
bool canceled()const { return m_prom->canceled(); }
bool canceled()const { return m_prom ? m_prom->canceled() : true; }
void cancel_and_wait()
{

View file

@ -23,21 +23,25 @@ namespace fc {
{
if( _sock.is_open() )
_sock.close();
if( _read_in_progress.valid() ) try { _read_in_progress.wait(); } catch ( ... ) {}
if( _write_in_progress.valid() ) try { _write_in_progress.wait(); } catch ( ... ) {}
}
virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
fc::future<size_t> _write_in_progress;
fc::future<size_t> _read_in_progress;
boost::asio::ip::tcp::socket _sock;
tcp_socket_io_hooks* _io_hooks;
};
size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
{
return fc::asio::read_some(socket, boost::asio::buffer(buffer, length));
return (_read_in_progress = fc::asio::read_some(socket, boost::asio::buffer(buffer, length))).wait();
}
size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
{
return fc::asio::write_some(socket, boost::asio::buffer(buffer, length));
return (_write_in_progress = fc::asio::write_some(socket, boost::asio::buffer(buffer, length))).wait();
}

View file

@ -108,7 +108,7 @@ namespace fc {
}
thread::~thread() {
//slog( "my %p", my );
//wlog( "my ${n}", ("n",name()) );
if( is_current() )
{
wlog( "delete my" );
@ -139,7 +139,7 @@ namespace fc {
return;
}
// wlog( "%s", my->name.c_str() );
wlog( "${s}", ("s",name()) );
// We are quiting from our own thread...
// break all promises, thread quit!
@ -155,8 +155,8 @@ namespace fc {
cur = n;
}
if( my->blocked ) {
// wlog( "still blocking... whats up with that?");
// debug( "on quit" );
wlog( "still blocking... whats up with that?");
debug( "on quit" );
}
}
BOOST_ASSERT( my->blocked == 0 );