From 13da4b0d6a715d744fec3a04989e1a0788c37b37 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Mon, 30 Jun 2014 10:50:50 -0400 Subject: [PATCH] improve socket error handling --- src/network/udt_socket.cpp | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/src/network/udt_socket.cpp b/src/network/udt_socket.cpp index 7b91afa..a065aac 100644 --- a/src/network/udt_socket.cpp +++ b/src/network/udt_socket.cpp @@ -77,6 +77,7 @@ namespace fc { } // while not canceled } // poll_loop + void notify_read( int udt_socket_id, const promise::ptr& p ) { @@ -110,6 +111,22 @@ namespace fc { } void remove( int udt_socket_id ) { + { synchronized(_read_promises_mutex) + auto read_itr = _read_promises.find( udt_socket_id ); + if( read_itr != _read_promises.end() ) + { + read_itr->second->set_exception( fc::copy_exception( fc::canceled_exception() ) ); + _read_promises.erase(read_itr); + } + } + { synchronized(_write_promises_mutex) + auto write_itr = _write_promises.find( udt_socket_id ); + if( write_itr != _write_promises.end() ) + { + write_itr->second->set_exception( fc::copy_exception( fc::canceled_exception() ) ); + _write_promises.erase(write_itr); + } + } UDT::epoll_remove_usock( _epoll_id, udt_socket_id ); } @@ -264,11 +281,14 @@ namespace fc { if( is_open() ) { default_epool_service().remove( _udt_socket_id ); - UDT::close( _udt_socket_id ); check_udt_errors(); _udt_socket_id = UDT::INVALID_SOCK; } + else + { + wlog( "already closed" ); + } } FC_CAPTURE_AND_RETHROW() } /// @} @@ -317,9 +337,9 @@ namespace fc { { try { if( _udt_socket_id != UDT::INVALID_SOCK ) { - default_epool_service().remove( _udt_socket_id ); UDT::close( _udt_socket_id ); check_udt_errors(); + default_epool_service().remove( _udt_socket_id ); _udt_socket_id = UDT::INVALID_SOCK; } } FC_CAPTURE_AND_RETHROW() } @@ -330,17 +350,17 @@ namespace fc { int namelen; sockaddr_in their_addr; - s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen ); - if( s._udt_socket_id == UDT::INVALID_SOCK ) + while( s._udt_socket_id == UDT::INVALID_SOCK ) { + s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen ); if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV ) { UDT::getlasterror().clear(); promise::ptr p(new promise("udt_server::accept")); default_epool_service().notify_read( _udt_socket_id, p ); p->wait(); - this->accept(s); + s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen ); } else check_udt_errors();