Expose enough functions in tcp_socket and tcp_server to allow listening and originating connections on the same port. So far, this seems to work on win32, other platforms untested.

Add a local_endpoint() function so we can find out which local interface a socket is bound to
This commit is contained in:
Eric Frias 2014-04-29 19:54:43 -04:00
parent 00edd3958c
commit aa111510f3
2 changed files with 74 additions and 26 deletions

View file

@ -15,13 +15,16 @@ namespace fc {
void connect_to( const fc::ip::endpoint& remote_endpoint ); void connect_to( const fc::ip::endpoint& remote_endpoint );
void connect_to( const fc::ip::endpoint& remote_endpoint, const fc::ip::endpoint& local_endpoint ); void connect_to( const fc::ip::endpoint& remote_endpoint, const fc::ip::endpoint& local_endpoint );
void enable_keep_alives(const fc::microseconds& interval); void enable_keep_alives(const fc::microseconds& interval);
fc::ip::endpoint remote_endpoint()const; void set_reuse_address(bool enable = true); // set SO_REUSEADDR
fc::ip::endpoint remote_endpoint() const;
fc::ip::endpoint local_endpoint() const;
void get( char& c ) void get( char& c )
{ {
read( &c, 1 ); read( &c, 1 );
} }
/// istream interface /// istream interface
/// @{ /// @{
virtual size_t readsome( char* buffer, size_t max ); virtual size_t readsome( char* buffer, size_t max );
@ -35,6 +38,7 @@ namespace fc {
virtual void close(); virtual void close();
/// @} /// @}
void open();
bool is_open()const; bool is_open()const;
private: private:
@ -57,6 +61,7 @@ namespace fc {
void close(); void close();
void accept( tcp_socket& s ); void accept( tcp_socket& s );
void set_reuse_address(bool enable = true); // set SO_REUSEADDR, call before listen
void listen( uint16_t port ); void listen( uint16_t port );
void listen( const fc::ip::endpoint& ep ); void listen( const fc::ip::endpoint& ep );
uint16_t get_port()const; uint16_t get_port()const;

View file

@ -14,12 +14,21 @@ namespace fc {
class tcp_socket::impl { class tcp_socket::impl {
public: public:
impl():_sock( fc::asio::default_io_service() ){} impl() :
_sock( fc::asio::default_io_service() )
{}
~impl(){ ~impl(){
if( _sock.is_open() ) _sock.close(); if( _sock.is_open() )
_sock.close();
} }
boost::asio::ip::tcp::socket _sock; boost::asio::ip::tcp::socket _sock;
}; };
void tcp_socket::open()
{
my->_sock.open(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0).protocol());
}
bool tcp_socket::is_open()const { bool tcp_socket::is_open()const {
return my->_sock.is_open(); return my->_sock.is_open();
} }
@ -46,11 +55,17 @@ namespace fc {
return fc::asio::write_some( my->_sock, boost::asio::buffer( buf, len ) ); return fc::asio::write_some( my->_sock, boost::asio::buffer( buf, len ) );
} }
fc::ip::endpoint tcp_socket::remote_endpoint()const fc::ip::endpoint tcp_socket::remote_endpoint()const
{ {
auto rep = my->_sock.remote_endpoint(); auto rep = my->_sock.remote_endpoint();
return fc::ip::endpoint(rep.address().to_v4().to_ulong(), rep.port() ); return fc::ip::endpoint(rep.address().to_v4().to_ulong(), rep.port() );
} }
fc::ip::endpoint tcp_socket::local_endpoint() const
{
auto boost_local_endpoint = my->_sock.local_endpoint();
return fc::ip::endpoint(boost_local_endpoint.address().to_v4().to_ulong(), boost_local_endpoint.port() );
}
size_t tcp_socket::readsome( char* buf, size_t len ) { size_t tcp_socket::readsome( char* buf, size_t len ) {
auto r = fc::asio::read_some( my->_sock, boost::asio::buffer( buf, len ) ); auto r = fc::asio::read_some( my->_sock, boost::asio::buffer( buf, len ) );
@ -62,10 +77,17 @@ namespace fc {
} }
void tcp_socket::connect_to( const fc::ip::endpoint& remote_endpoint, const fc::ip::endpoint& local_endpoint ) { void tcp_socket::connect_to( const fc::ip::endpoint& remote_endpoint, const fc::ip::endpoint& local_endpoint ) {
my->_sock = boost::asio::ip::tcp::socket(fc::asio::default_io_service(), try
boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4(local_endpoint.get_address()), {
local_endpoint.port())); my->_sock.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4(local_endpoint.get_address()),
fc::asio::tcp::connect(my->_sock, fc::asio::tcp::endpoint( boost::asio::ip::address_v4(remote_endpoint.get_address()), remote_endpoint.port() ) ); local_endpoint.port()));
}
catch (std::exception& except)
{
elog("Exception binding outgoing connection to desired local endpoint: ${what}", ("what", except.what()));
throw;
}
fc::asio::tcp::connect(my->_sock, fc::asio::tcp::endpoint(boost::asio::ip::address_v4(remote_endpoint.get_address()), remote_endpoint.port()));
} }
void tcp_socket::enable_keep_alives(const fc::microseconds& interval) void tcp_socket::enable_keep_alives(const fc::microseconds& interval)
@ -77,8 +99,8 @@ namespace fc {
#if defined _WIN32 || defined WIN32 || defined OS_WIN64 || defined _WIN64 || defined WIN64 || defined WINNT #if defined _WIN32 || defined WIN32 || defined OS_WIN64 || defined _WIN64 || defined WIN64 || defined WINNT
struct tcp_keepalive keepalive_settings; struct tcp_keepalive keepalive_settings;
keepalive_settings.onoff = 1; keepalive_settings.onoff = 1;
keepalive_settings.keepalivetime = interval.count() / fc::milliseconds(1).count(); keepalive_settings.keepalivetime = (ULONG)(interval.count() / fc::milliseconds(1).count());
keepalive_settings.keepaliveinterval = interval.count() / fc::milliseconds(1).count(); keepalive_settings.keepaliveinterval = (ULONG)(interval.count() / fc::milliseconds(1).count());
DWORD dwBytesRet = 0; DWORD dwBytesRet = 0;
if (WSAIoctl(my->_sock.native(), SIO_KEEPALIVE_VALS, &keepalive_settings, sizeof(keepalive_settings), if (WSAIoctl(my->_sock.native(), SIO_KEEPALIVE_VALS, &keepalive_settings, sizeof(keepalive_settings),
@ -109,13 +131,21 @@ namespace fc {
} }
} }
void tcp_socket::set_reuse_address(bool enable /* = true */)
{
FC_ASSERT(my->_sock.is_open());
boost::asio::socket_base::reuse_address option(enable);
my->_sock.set_option(option);
}
class tcp_server::impl { class tcp_server::impl {
public: public:
impl(uint16_t port) impl()
:_accept( fc::asio::default_io_service(), boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port) ){} :_accept( fc::asio::default_io_service() )
{
impl(const fc::ip::endpoint& ep ) _accept.open(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0).protocol());
:_accept( fc::asio::default_io_service(), boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4( ep.get_address()), ep.port()) ){} }
~impl(){ ~impl(){
try { try {
@ -130,8 +160,10 @@ namespace fc {
boost::asio::ip::tcp::acceptor _accept; boost::asio::ip::tcp::acceptor _accept;
}; };
void tcp_server::close() { void tcp_server::close() {
if( my && my->_accept.is_open() ) my->_accept.close(); if( my && my->_accept.is_open() )
delete my; my = nullptr; my->_accept.close();
delete my;
my = nullptr;
} }
tcp_server::tcp_server() tcp_server::tcp_server()
:my(nullptr) { :my(nullptr) {
@ -149,20 +181,31 @@ namespace fc {
fc::asio::tcp::accept( my->_accept, s.my->_sock ); fc::asio::tcp::accept( my->_accept, s.my->_sock );
} FC_RETHROW_EXCEPTIONS( warn, "Unable to accept connection on socket." ); } FC_RETHROW_EXCEPTIONS( warn, "Unable to accept connection on socket." );
} }
void tcp_server::set_reuse_address(bool enable /* = true */)
{
if( !my )
my = new impl;
boost::asio::ip::tcp::acceptor::reuse_address option(enable);
my->_accept.set_option(option);
}
void tcp_server::listen( uint16_t port ) void tcp_server::listen( uint16_t port )
{ {
if( my ) delete my; if( !my )
my = new impl(port); my = new impl;
my->_accept.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4(), port));
my->_accept.listen();
} }
void tcp_server::listen( const fc::ip::endpoint& ep ) void tcp_server::listen( const fc::ip::endpoint& ep )
{ {
if( my ) delete my; if( !my )
my = new impl(ep); my = new impl;
my->_accept.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4::from_string((string)ep.get_address()), ep.port()));
my->_accept.listen();
} }
uint16_t tcp_server::get_port()const uint16_t tcp_server::get_port()const
{ {
FC_ASSERT( my != nullptr );
return my->_accept.local_endpoint().port(); return my->_accept.local_endpoint().port();
} }