diff --git a/include/fc/exception/exception.hpp b/include/fc/exception/exception.hpp index e6c686e..11bfbb1 100644 --- a/include/fc/exception/exception.hpp +++ b/include/fc/exception/exception.hpp @@ -31,7 +31,8 @@ namespace fc std_exception_code = 13, invalid_operation_exception_code = 14, unknown_host_exception_code = 15, - null_optional_code = 16 + null_optional_code = 16, + udt_error_code = 17 }; /** @@ -281,6 +282,7 @@ namespace fc FC_DECLARE_EXCEPTION( assert_exception, assert_exception_code, "Assert Exception" ); FC_DECLARE_EXCEPTION( eof_exception, eof_exception_code, "End Of File" ); FC_DECLARE_EXCEPTION( null_optional, null_optional_code, "null optional" ); + FC_DECLARE_EXCEPTION( udt_exception, udt_error_code, "UDT error" ); std::string except_str(); diff --git a/include/fc/network/ip.hpp b/include/fc/network/ip.hpp index 9760aeb..5a1bd0c 100644 --- a/include/fc/network/ip.hpp +++ b/include/fc/network/ip.hpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace fc { @@ -78,6 +79,7 @@ namespace fc { void to_variant( const ip::address& var, variant& vo ); void from_variant( const variant& var, ip::address& vo ); + namespace raw { template @@ -110,7 +112,9 @@ namespace fc { } } -} +} // namespace fc +FC_REFLECT_TYPENAME( fc::ip::address ) +FC_REFLECT_TYPENAME( fc::ip::endpoint ) namespace std { template<> diff --git a/include/fc/network/udt_socket.hpp b/include/fc/network/udt_socket.hpp new file mode 100644 index 0000000..b022b93 --- /dev/null +++ b/include/fc/network/udt_socket.hpp @@ -0,0 +1,49 @@ +#pragma once +#include +#include +#include +#include + +namespace fc { + namespace ip { class endpoint; } + + class udt_socket : public virtual iostream + { + public: + udt_socket(); + ~udt_socket(); + + void connect_to( const fc::ip::endpoint& remote_endpoint ); + + fc::ip::endpoint remote_endpoint() const; + fc::ip::endpoint local_endpoint() const; + + void get( char& c ) + { + read( &c, 1 ); + } + + + /// istream interface + /// @{ + virtual size_t readsome( char* buffer, size_t max ); + virtual bool eof()const; + /// @} + + /// ostream interface + /// @{ + virtual size_t writesome( const char* buffer, size_t len ); + virtual void flush(); + virtual void close(); + /// @} + + void open(); + bool is_open()const; + + private: + friend class udt_server; + int _udt_socket_id; + }; + typedef std::shared_ptr udt_socket_ptr; + +} // fc diff --git a/src/network/udt_socket.cpp b/src/network/udt_socket.cpp new file mode 100644 index 0000000..8f89de5 --- /dev/null +++ b/src/network/udt_socket.cpp @@ -0,0 +1,120 @@ +#include + +namespace fc { + + void check_udt_errors() + { + UDT::ERRORINFO& error_info = UDT::getlasterror(); + if( error_info.getErrorCode() ) + { + std::string error_message = error_info.getErrorMessage(); + error_info.clear(); + FC_CAPTURE_AND_THROW( udt_exception, (error_message) ); + } + } + + udt_socket::udt_socket() + :_udt_socket_id( UDT::INVALID_SOCK ) + { + } + + ~udt_socket() + { + close(); + } + + void udt_socket::connect_to( const ip::endpoint& remote_endpoint ) + { try { + sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(remote_endpoint.port()); + serv_addr.sin_addr = htonl(remote_endpoint.address()); + + // connect to the server, implict bind + if (UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr))) + check_udt_errors(); + + } FC_CAPTURE_AND_RETHROW( (remote_endpoint) ) } + + ip::endpoint udt_socket::remote_endpoint() const + { try { + sockaddr_in peer_addr; + int peer_addr_size = sizeof(peer_addr); + int error_code = UDT::getpeername( _udt_socket_id, &peer_addr, &peer_addr_size ); + if( error_code == UDT::ERROR ) + check_udt_errors(); + return ip::endpoint( address( htonl( peer_addr.sin_addr ) ), htons(peer_addr.sin_port) ); + } FC_CAPTURE_AND_RETHROW() } + + ip::endpoint udt_socket::local_endpoint() const + { try { + sockaddr_in sock_addr; + int addr_size = sizeof(peer_addr); + int error_code = UDT::getsockname( _udt_socket_id, &sock_addr, &addr_size ); + if( error_code == UDT::ERROR ) + check_udt_errors(); + return ip::endpoint( address( htonl( sock_addr.sin_addr ) ), htons(sock_addr.sin_port) ); + } FC_CAPTURE_AND_RETHROW() } + + + /// @{ + size_t udt_socket::readsome( char* buffer, size_t max ) + { try { + auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 ); + if( bytes_read == UDT::ERROR ) + { + if( UDT::getlasterror().getCode() == UDT::EASYNCRCV ) + { + // create a future and post to epoll, wait on it, then + // call readsome recursively. + } + else + check_udt_errors(); + } + return bytes_read; + } FC_CAPTURE_AND_RETHROW( (max) ) } + + bool udt_socket::eof()const + { + // TODO... + return false; + } + /// @} + + /// ostream interface + /// @{ + size_t udt_socket::writesome( const char* buffer, size_t len ) + { + auto bytes_sent = UDT::send(_udt_socket_idl, buffer, len, 0); + + if( UDT::ERROR == bytes_sent ) + check_udt_errors(); + + if( bytes_sent == 0 ) + { + // schedule wait with epoll + } + return bytes_sent; + } + + void udt_socket::flush(){} + + void udt_socket::close() + { try { + UDT::close( _udt_socket_id ); + check_udt_errors(); + } FC_CAPTURE_AND_RETHROW() } + /// @} + + void udt_socket::open() + { + _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0); + } + + bool udt_socket::is_open()const + { + return _udt_socket_id != UDT::INVALID_SOCK; + } + + +}