diff --git a/.gitignore b/.gitignore index b685cae..d6f93d1 100644 --- a/.gitignore +++ b/.gitignore @@ -33,16 +33,23 @@ ZERO_CHECK Debug/ Release/ +CMakeCache.txt CMakeFiles Makefile -cmake_install.cmake +*.cmake *.cbp libfc.a libfc_debug.a +*.sw* + fc_automoc.cpp git_revision.cpp GitSHA3.cpp -*.sw* +lzma_test +ntp_test +task_cancel_test +udt_client +udt_server diff --git a/CMakeLists.txt b/CMakeLists.txt index 4d383a4..3b0ed56 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,7 +33,7 @@ endif() SET (ORIGINAL_LIB_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) SET(BOOST_COMPONENTS) -LIST(APPEND BOOST_COMPONENTS thread date_time system filesystem program_options signals serialization chrono unit_test_framework context locale) +LIST(APPEND BOOST_COMPONENTS thread date_time system filesystem program_options signals serialization chrono unit_test_framework context locale iostreams) IF( WIN32 ) MESSAGE(STATUS "Configuring fc to build on Win32") @@ -147,10 +147,10 @@ set( fc_sources src/crypto/rand.cpp src/crypto/salsa20.cpp src/crypto/scrypt.cpp -# commented out until committed -# src/crypto/romix.cpp + src/crypto/romix.cpp src/network/tcp_socket.cpp src/network/udp_socket.cpp + src/network/udt_socket.cpp src/network/http/http_connection.cpp src/network/http/http_server.cpp src/network/ntp.cpp @@ -182,6 +182,7 @@ list(APPEND sources "${CMAKE_CURRENT_BINARY_DIR}/git_revision.cpp") list(APPEND sources ${fc_headers}) add_subdirectory( vendor/easylzma ) +add_subdirectory( vendor/scrypt-jane ) add_subdirectory( vendor/udt4 ) setup_library( fc SOURCES ${sources} LIBRARY_TYPE STATIC DONT_INSTALL_LIBRARY ) @@ -199,10 +200,9 @@ ELSE() SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99 -Wall") IF(APPLE) - SET(CMAKE_CXX_FLAGS "${CMAKE_C_FLAGS} -std=c++11 -stdlib=libc++ -Wall") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -stdlib=libc++ -Wall") ELSE() - target_compile_options(fc PUBLIC -std=c++11 -Wall -fnon-call-exceptions) - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -fnon-call-exceptions") ENDIF() ENDIF() @@ -220,18 +220,24 @@ target_include_directories(fc ${CMAKE_CURRENT_SOURCE_DIR}/vendor/udt4/src ) -target_link_libraries( fc PUBLIC easylzma_static ${Boost_LIBRARIES} ${OPENSSL_LIBRARIES} ${ZLIB_LIBRARIES} ${PLATFORM_SPECIFIC_LIBS} ${RPCRT4} ${CMAKE_DL_LIBS} ${rt_library}) +target_link_libraries( fc PUBLIC easylzma_static scrypt udt ${Boost_LIBRARIES} ${OPENSSL_LIBRARIES} ${ZLIB_LIBRARIES} ${PLATFORM_SPECIFIC_LIBS} ${RPCRT4} ${CMAKE_DL_LIBS} ${rt_library}) add_executable( ntp_test ntp_test.cpp ) target_link_libraries( ntp_test fc ) +add_executable( task_cancel_test tests/task_cancel.cpp ) +target_link_libraries( task_cancel_test fc ) + #include_directories( vendor/udt4/src ) -#add_executable( udt_server tests/udt_server.cpp ) -#target_link_libraries( udt_server fc udt ) +add_executable( udt_server tests/udts.cpp ) +target_link_libraries( udt_server fc udt ) -#add_executable( udt_client tests/udt_client.cpp ) -#target_link_libraries( udt_client fc udt ) +add_executable( udt_client tests/udtc.cpp ) +target_link_libraries( udt_client fc udt ) + +add_executable( lzma_test tests/lzma_test.cpp ) +target_link_libraries( lzma_test fc ) #add_executable( test_compress tests/compress.cpp ) #target_link_libraries( test_compress fc ) diff --git a/include/fc/asio.hpp b/include/fc/asio.hpp index 8723290..ef8f6b9 100644 --- a/include/fc/asio.hpp +++ b/include/fc/asio.hpp @@ -79,11 +79,11 @@ namespace asio { * @return the number of bytes read. */ template - size_t read_some(AsyncReadStream& s, const MutableBufferSequence& buf) + future read_some(AsyncReadStream& s, const MutableBufferSequence& buf) { promise::ptr p(new promise("fc::asio::async_read_some")); s.async_read_some(buf, boost::bind(detail::read_write_handler, p, _1, _2)); - return p->wait(); + return p;//->wait(); } template @@ -117,10 +117,10 @@ namespace asio { * @return the number of bytes written */ template - size_t write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { + future write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) { promise::ptr p(new promise("fc::asio::write_some")); s.async_write_some( buf, boost::bind( detail::read_write_handler, p, _1, _2 ) ); - return p->wait(); + return p; //->wait(); } /** @@ -183,7 +183,7 @@ namespace asio { virtual size_t readsome( char* buf, size_t len ) { - auto r = fc::asio::read_some(*_stream, boost::asio::buffer(buf, len) ); + auto r = fc::asio::read_some(*_stream, boost::asio::buffer(buf, len) ).wait(); return r; } @@ -200,7 +200,7 @@ namespace asio { virtual size_t writesome( const char* buf, size_t len ) { - return fc::asio::write_some(*_stream, boost::asio::const_buffers_1(buf, len) ); + return fc::asio::write_some(*_stream, boost::asio::const_buffers_1(buf, len) ).wait(); } virtual void close(){ _stream->close(); } diff --git a/include/fc/compress/lzma.hpp b/include/fc/compress/lzma.hpp index a6d4f81..44d17b7 100644 --- a/include/fc/compress/lzma.hpp +++ b/include/fc/compress/lzma.hpp @@ -1,9 +1,19 @@ #pragma once + +#include #include namespace fc { - std::vector lzma_compress( const std::vector& in ); - std::vector lzma_decompress( const std::vector& compressed ); +std::vector lzma_compress( const std::vector& in ); +std::vector lzma_decompress( const std::vector& compressed ); + +void lzma_compress_file( const path& src_path, + const path& dst_path, + unsigned char level = 5, + unsigned int dict_size = (1 << 20) ); + +void lzma_decompress_file( const path& src_path, + const path& dst_path ); } // namespace fc diff --git a/include/fc/crypto/aes.hpp b/include/fc/crypto/aes.hpp index 70fdc7c..f0c7616 100644 --- a/include/fc/crypto/aes.hpp +++ b/include/fc/crypto/aes.hpp @@ -37,12 +37,12 @@ namespace fc { fc::fwd my; }; - int aes_encrypt(unsigned char *plaintext, int plaintext_len, unsigned char *key, - unsigned char *iv, unsigned char *ciphertext); - int aes_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, - unsigned char *iv, unsigned char *plaintext); - int aes_cfb_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, - unsigned char *iv, unsigned char *plaintext); + unsigned aes_encrypt(unsigned char *plaintext, int plaintext_len, unsigned char *key, + unsigned char *iv, unsigned char *ciphertext); + unsigned aes_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, + unsigned char *iv, unsigned char *plaintext); + unsigned aes_cfb_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, + unsigned char *iv, unsigned char *plaintext); std::vector aes_encrypt( const fc::sha512& key, const std::vector& plain_text ); std::vector aes_decrypt( const fc::sha512& key, const std::vector& cipher_text ); diff --git a/include/fc/crypto/romix.hpp b/include/fc/crypto/romix.hpp index e46398d..21c3bc0 100644 --- a/include/fc/crypto/romix.hpp +++ b/include/fc/crypto/romix.hpp @@ -43,6 +43,7 @@ //////////////////////////////////////////////////////////////////////////////// #pragma once +#include #include namespace fc { @@ -56,21 +57,21 @@ namespace fc { class romix { public: - romix(u_int32_t memReqts, u_int32_t numIter, std::string salt); + romix(uint32_t memReqts, uint32_t numIter, std::string salt); std::string deriveKey_OneIter(std::string const & password); std::string deriveKey(std::string const & password); private: - u_int32_t hashOutputBytes_; - u_int32_t kdfOutputBytes_; // size of final key data + uint32_t hashOutputBytes_; + uint32_t kdfOutputBytes_; // size of final key data - u_int32_t memoryReqtBytes_; - u_int32_t sequenceCount_; + uint32_t memoryReqtBytes_; + uint32_t sequenceCount_; std::string salt_; // prob not necessary amidst numIter, memReqts // but I guess it can't hurt - u_int32_t numIterations_; // We set the ROMIX params for a given memory + uint32_t numIterations_; // We set the ROMIX params for a given memory // req't. Then run it numIter times to meet // the computation-time req't }; diff --git a/include/fc/crypto/scrypt.hpp b/include/fc/crypto/scrypt.hpp index 53d44df..b74708a 100644 --- a/include/fc/crypto/scrypt.hpp +++ b/include/fc/crypto/scrypt.hpp @@ -1,10 +1,9 @@ #pragma once -#include #include namespace fc { - void scrypt_derive_key ( const std::vector &passphrase, const std::vector &salt, - unsigned int n, unsigned int r, unsigned int p, std::vector &key ); + void scrypt_derive_key( const std::vector& passphrase, const std::vector& salt, + unsigned int n, unsigned int r, unsigned int p, std::vector& key ); } // namespace fc diff --git a/include/fc/exception/exception.hpp b/include/fc/exception/exception.hpp index e6c686e..11bfbb1 100644 --- a/include/fc/exception/exception.hpp +++ b/include/fc/exception/exception.hpp @@ -31,7 +31,8 @@ namespace fc std_exception_code = 13, invalid_operation_exception_code = 14, unknown_host_exception_code = 15, - null_optional_code = 16 + null_optional_code = 16, + udt_error_code = 17 }; /** @@ -281,6 +282,7 @@ namespace fc FC_DECLARE_EXCEPTION( assert_exception, assert_exception_code, "Assert Exception" ); FC_DECLARE_EXCEPTION( eof_exception, eof_exception_code, "End Of File" ); FC_DECLARE_EXCEPTION( null_optional, null_optional_code, "null optional" ); + FC_DECLARE_EXCEPTION( udt_exception, udt_error_code, "UDT error" ); std::string except_str(); diff --git a/include/fc/interprocess/iprocess.hpp b/include/fc/interprocess/iprocess.hpp index 0f8671e..c7f8c4e 100644 --- a/include/fc/interprocess/iprocess.hpp +++ b/include/fc/interprocess/iprocess.hpp @@ -32,7 +32,7 @@ namespace fc * @return *this */ virtual iprocess& exec( const path& exe, std::vector args, - const path& work_dir = path(), exec_opts opts = open_all ) = 0; + const path& work_dir = path(), int opts = open_all ) = 0; /** * @return blocks until the process exits diff --git a/include/fc/interprocess/process.hpp b/include/fc/interprocess/process.hpp index 28333ef..777c154 100644 --- a/include/fc/interprocess/process.hpp +++ b/include/fc/interprocess/process.hpp @@ -18,7 +18,7 @@ namespace fc { virtual iprocess& exec( const fc::path& exe, std::vector args, const fc::path& work_dir = fc::path(), - exec_opts opts = open_all ); + int opts = open_all ); virtual int result(const microseconds& timeout = microseconds::maximum()); diff --git a/include/fc/io/varint.hpp b/include/fc/io/varint.hpp index 3536d4e..e289f87 100644 --- a/include/fc/io/varint.hpp +++ b/include/fc/io/varint.hpp @@ -19,12 +19,21 @@ struct unsigned_int { uint32_t value; - friend bool operator==( const unsigned_int& i, const uint32_t& v ) { return v == i.value; } - friend bool operator!=( const unsigned_int& i, const uint32_t& v ) { return v != i.value; } - friend bool operator<( const unsigned_int& i, const uint32_t& v ) { return v < i.value; } - friend bool operator>=( const unsigned_int& i, const uint32_t& v ) { return v >= i.value; } - friend bool operator<( const unsigned_int& i, const unsigned_int& v ) { return v < i.value; } - friend bool operator>=( const unsigned_int& i, const unsigned_int& v ) { return v >= i.value; } + friend bool operator==( const unsigned_int& i, const uint32_t& v ) { return i.value == v; } + friend bool operator==( const uint32_t& i, const unsigned_int& v ) { return i == v.value; } + friend bool operator==( const unsigned_int& i, const unsigned_int& v ) { return i.value == v.value; } + + friend bool operator!=( const unsigned_int& i, const uint32_t& v ) { return i.value != v; } + friend bool operator!=( const uint32_t& i, const unsigned_int& v ) { return i != v.value; } + friend bool operator!=( const unsigned_int& i, const unsigned_int& v ) { return i.value != v.value; } + + friend bool operator<( const unsigned_int& i, const uint32_t& v ) { return i.value < v; } + friend bool operator<( const uint32_t& i, const unsigned_int& v ) { return i < v.value; } + friend bool operator<( const unsigned_int& i, const unsigned_int& v ) { return i.value < v.value; } + + friend bool operator>=( const unsigned_int& i, const uint32_t& v ) { return i.value >= v; } + friend bool operator>=( const uint32_t& i, const unsigned_int& v ) { return i >= v.value; } + friend bool operator>=( const unsigned_int& i, const unsigned_int& v ) { return i.value >= v.value; } }; /** @@ -41,6 +50,22 @@ struct signed_int { signed_int& operator++(){ ++value; return *this; } int32_t value; + + friend bool operator==( const signed_int& i, const int32_t& v ) { return i.value == v; } + friend bool operator==( const int32_t& i, const signed_int& v ) { return i == v.value; } + friend bool operator==( const signed_int& i, const signed_int& v ) { return i.value == v.value; } + + friend bool operator!=( const signed_int& i, const int32_t& v ) { return i.value != v; } + friend bool operator!=( const int32_t& i, const signed_int& v ) { return i != v.value; } + friend bool operator!=( const signed_int& i, const signed_int& v ) { return i.value != v.value; } + + friend bool operator<( const signed_int& i, const int32_t& v ) { return i.value < v; } + friend bool operator<( const int32_t& i, const signed_int& v ) { return i < v.value; } + friend bool operator<( const signed_int& i, const signed_int& v ) { return i.value < v.value; } + + friend bool operator>=( const signed_int& i, const int32_t& v ) { return i.value >= v; } + friend bool operator>=( const int32_t& i, const signed_int& v ) { return i >= v.value; } + friend bool operator>=( const signed_int& i, const signed_int& v ) { return i.value >= v.value; } }; class variant; @@ -64,4 +89,13 @@ namespace std return std::hash()(a.value); } }; + template<> + struct hash + { + public: + size_t operator()(const fc::signed_int &a) const + { + return std::hash()(a.value); + } + }; } diff --git a/include/fc/log/file_appender.hpp b/include/fc/log/file_appender.hpp index 8adaf6a..af7aa62 100644 --- a/include/fc/log/file_appender.hpp +++ b/include/fc/log/file_appender.hpp @@ -1,12 +1,12 @@ #pragma once + +#include #include #include -#include +#include namespace fc { -class varaint; - class file_appender : public appender { public: struct config { @@ -16,10 +16,14 @@ class file_appender : public appender { fc::path filename; bool flush; bool truncate; + bool rotate; + microseconds rotation_interval; + microseconds rotation_limit; + bool rotation_compression; }; file_appender( const variant& args ); ~file_appender(); - virtual void log( const log_message& m ); + virtual void log( const log_message& m )override; private: class impl; @@ -28,4 +32,5 @@ class file_appender : public appender { } // namespace fc #include -FC_REFLECT( fc::file_appender::config, (format)(filename)(flush)(truncate) ) +FC_REFLECT( fc::file_appender::config, + (format)(filename)(flush)(truncate)(rotate)(rotation_interval)(rotation_limit)(rotation_compression) ) diff --git a/include/fc/network/ip.hpp b/include/fc/network/ip.hpp index 9760aeb..5a1bd0c 100644 --- a/include/fc/network/ip.hpp +++ b/include/fc/network/ip.hpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace fc { @@ -78,6 +79,7 @@ namespace fc { void to_variant( const ip::address& var, variant& vo ); void from_variant( const variant& var, ip::address& vo ); + namespace raw { template @@ -110,7 +112,9 @@ namespace fc { } } -} +} // namespace fc +FC_REFLECT_TYPENAME( fc::ip::address ) +FC_REFLECT_TYPENAME( fc::ip::endpoint ) namespace std { template<> diff --git a/include/fc/network/tcp_socket.hpp b/include/fc/network/tcp_socket.hpp index f9dbce0..1159654 100644 --- a/include/fc/network/tcp_socket.hpp +++ b/include/fc/network/tcp_socket.hpp @@ -51,7 +51,7 @@ namespace fc { #ifdef _WIN64 fc::fwd my; #else - fc::fwd my; + fc::fwd my; #endif }; typedef std::shared_ptr tcp_socket_ptr; diff --git a/include/fc/network/udt_socket.hpp b/include/fc/network/udt_socket.hpp new file mode 100644 index 0000000..aa1dd49 --- /dev/null +++ b/include/fc/network/udt_socket.hpp @@ -0,0 +1,67 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace fc { + namespace ip { class endpoint; } + + class udt_socket : public virtual iostream, public noncopyable + { + public: + udt_socket(); + ~udt_socket(); + + void bind( const fc::ip::endpoint& local_endpoint ); + void connect_to( const fc::ip::endpoint& remote_endpoint ); + + fc::ip::endpoint remote_endpoint() const; + fc::ip::endpoint local_endpoint() const; + + void get( char& c ) + { + read( &c, 1 ); + } + + + /// istream interface + /// @{ + virtual size_t readsome( char* buffer, size_t max ); + virtual bool eof()const; + /// @} + + /// ostream interface + /// @{ + virtual size_t writesome( const char* buffer, size_t len ); + virtual void flush(); + virtual void close(); + /// @} + + void open(); + bool is_open()const; + + private: + friend class udt_server; + int _udt_socket_id; + }; + typedef std::shared_ptr udt_socket_ptr; + + class udt_server : public noncopyable + { + public: + udt_server(); + ~udt_server(); + + void close(); + void accept( udt_socket& s ); + + void listen( const fc::ip::endpoint& ep ); + fc::ip::endpoint local_endpoint() const; + + private: + int _udt_socket_id; + }; + +} // fc diff --git a/include/fc/noncopyable.hpp b/include/fc/noncopyable.hpp new file mode 100644 index 0000000..87fad6b --- /dev/null +++ b/include/fc/noncopyable.hpp @@ -0,0 +1,14 @@ +#pragma once + +namespace fc +{ + class noncopyable + { + public: + noncopyable(){} + private: + noncopyable( const noncopyable& ) = delete; + noncopyable& operator=( const noncopyable& ) = delete; + }; +} + diff --git a/include/fc/reflect/typename.hpp b/include/fc/reflect/typename.hpp index 64f57bc..7b3276a 100644 --- a/include/fc/reflect/typename.hpp +++ b/include/fc/reflect/typename.hpp @@ -4,6 +4,8 @@ namespace fc { class value; class exception; + namespace ip { class address; } + template class get_typename{}; template<> struct get_typename { static const char* name() { return "int32_t"; } }; template<> struct get_typename { static const char* name() { return "int64_t"; } }; diff --git a/include/fc/rpc/json_connection.hpp b/include/fc/rpc/json_connection.hpp index 9b4adc7..9a4b376 100644 --- a/include/fc/rpc/json_connection.hpp +++ b/include/fc/rpc/json_connection.hpp @@ -107,6 +107,17 @@ namespace fc { namespace rpc { const variant& a7 ); + future async_call( const fc::string& method, + const variant& a1, + const variant& a2, + const variant& a3, + const variant& a4, + const variant& a5, + const variant& a6, + const variant& a7, + const variant& a8 + ); + template Result call( const fc::string& method, const variants& args, diff --git a/include/fc/signals.hpp b/include/fc/signals.hpp index 35f9a60..632f4f4 100644 --- a/include/fc/signals.hpp +++ b/include/fc/signals.hpp @@ -8,6 +8,8 @@ namespace fc { #if !defined(BOOST_NO_TEMPLATE_ALIASES) template using signal = boost::signals2::signal; + + using scoped_connection = boost::signals2::scoped_connection; #else /** Workaround for missing Template Aliases feature in the VS 2012. \warning Class defined below cannot have defined constructor (even base class has it) diff --git a/include/fc/thread/future.hpp b/include/fc/thread/future.hpp index cf57a24..86e8a96 100644 --- a/include/fc/thread/future.hpp +++ b/include/fc/thread/future.hpp @@ -184,14 +184,15 @@ namespace fc { /// @post ready() /// @throws timeout const T& wait( const microseconds& timeout = microseconds::maximum() )const { - return m_prom->wait(timeout); + return m_prom->wait(timeout); } /// @pre valid() /// @post ready() /// @throws timeout const T& wait_until( const time_point& tp )const { - return m_prom->wait_until(tp); + if( m_prom ) + return m_prom->wait_until(tp); } bool valid()const { return !!m_prom; } @@ -203,7 +204,16 @@ namespace fc { bool error()const { return m_prom->error(); } void cancel()const { if( m_prom ) m_prom->cancel(); } - bool canceled()const { return m_prom->canceled(); } + bool canceled()const { if( m_prom ) return m_prom->canceled(); else return true;} + + void cancel_and_wait() + { + if( valid() ) + { + cancel(); + wait(); + } + } /** * @pre valid() @@ -239,7 +249,8 @@ namespace fc { /// @post ready() /// @throws timeout void wait( const microseconds& timeout = microseconds::maximum() ){ - m_prom->wait(timeout); + if( m_prom ) + m_prom->wait(timeout); } /// @pre valid() @@ -250,7 +261,13 @@ namespace fc { } bool valid()const { return !!m_prom; } - bool canceled()const { return m_prom->canceled(); } + bool canceled()const { return m_prom ? m_prom->canceled() : true; } + + void cancel_and_wait() + { + cancel(); + wait(); + } /// @pre valid() bool ready()const { return m_prom->ready(); } diff --git a/include/fc/thread/thread.hpp b/include/fc/thread/thread.hpp index e95a7d1..a133ba4 100644 --- a/include/fc/thread/thread.hpp +++ b/include/fc/thread/thread.hpp @@ -9,7 +9,7 @@ namespace fc { class thread { public: - thread( const char* name = "" ); + thread( const std::string& name = "" ); thread( thread&& m ); thread& operator=(thread&& t ); @@ -175,6 +175,10 @@ namespace fc { auto async( Functor&& f, const char* desc ="", priority prio = priority()) -> fc::future { return fc::thread::current().async( fc::forward(f), desc, prio ); } + template + auto schedule( Functor&& f, const fc::time_point& t, const char* desc ="", priority prio = priority()) -> fc::future { + return fc::thread::current().schedule( fc::forward(f), t, desc, prio ); + } } // end namespace fc diff --git a/include/fc/time.hpp b/include/fc/time.hpp index ca851ec..ae0d48d 100644 --- a/include/fc/time.hpp +++ b/include/fc/time.hpp @@ -102,7 +102,8 @@ namespace fc { friend bool operator != ( const time_point_sec& a, const time_point_sec& b ) { return a.utc_seconds != b.utc_seconds; } time_point_sec& operator += ( uint32_t m ) { utc_seconds+=m; return *this; } time_point_sec& operator -= ( uint32_t m ) { utc_seconds-=m; return *this; } - time_point_sec operator+( uint32_t offset ) { return time_point_sec(utc_seconds + offset); } + time_point_sec operator +( uint32_t offset )const { return time_point_sec(utc_seconds + offset); } + time_point_sec operator -( uint32_t offset )const { return time_point_sec(utc_seconds - offset); } friend time_point operator - ( const time_point_sec& t, const microseconds& m ) { return time_point(t) - m; } friend microseconds operator - ( const time_point_sec& t, const time_point_sec& m ) { return time_point(t) - time_point(m); } diff --git a/ntp_test.cpp b/ntp_test.cpp index 5d02167..b3937b1 100644 --- a/ntp_test.cpp +++ b/ntp_test.cpp @@ -16,6 +16,7 @@ int main( int argc, char** argv ) auto hours = delta.count() / 1000000 / 60 / 60; auto seconds = delta.count() / 1000000; auto msec= delta.count() / 1000; + idump( (fc::time_point::now() ) ); idump( (ntp_time)(delta)(msec)(seconds)(minutes)(hours) ); } else diff --git a/src/compress/lzma.cpp b/src/compress/lzma.cpp index 60db130..fb669d0 100644 --- a/src/compress/lzma.cpp +++ b/src/compress/lzma.cpp @@ -1,8 +1,11 @@ +#include #include #include - +#include #include +#include + namespace fc { std::vector lzma_compress(const std::vector& in) @@ -50,4 +53,148 @@ std::vector lzma_decompress( const std::vector& compressed ) return out; } +struct lzma_file_ctx +{ + const unsigned char* src_buf; + size_t src_len; + + path dst_path; +}; + +static int lzma_file_input_callback( void* input_ctx, void* input_buf, size_t* input_len ) +{ + FC_ASSERT( input_ctx != NULL ); + FC_ASSERT( input_buf != NULL ); + + const auto ctx = ( struct lzma_file_ctx* )input_ctx; + const auto size = ( ctx->src_len < *input_len ) ? ctx->src_len : *input_len; + + if( size > 0 ) + { + memcpy( input_buf, ( void * )ctx->src_buf, size ); + ctx->src_buf += size; + ctx->src_len -= size; + } + + *input_len = size; + + return 0; +} + +static size_t lzma_file_output_callback( void* output_ctx, const void* output_buf, size_t output_len ) +{ + FC_ASSERT( output_ctx != NULL ); + FC_ASSERT( output_buf != NULL ); + + const auto ctx = ( struct lzma_file_ctx* )output_ctx; + + if( output_len > 0 ) + { + size_t dst_len = 0; + if( !exists( ctx->dst_path ) ) + { + ofstream fs( ctx->dst_path ); + fs.close(); + } + else + { + dst_len = file_size( ctx->dst_path ); + } + + resize_file( ctx->dst_path, dst_len + output_len ); + + boost::iostreams::mapped_file_sink dst_file; + dst_file.open( ctx->dst_path.string() ); + FC_ASSERT( dst_file.is_open() ); + + memcpy( ( void* )(dst_file.data() + dst_len), output_buf, output_len); + + dst_file.close(); + } + + return output_len; +} + +void lzma_compress_file( const path& src_path, + const path& dst_path, + unsigned char level, + unsigned int dict_size ) +{ + FC_ASSERT( exists( src_path ) ); + FC_ASSERT( !exists( dst_path ) ); + + boost::iostreams::mapped_file_source src_file; + src_file.open( src_path.string() ); + FC_ASSERT( src_file.is_open() ); + + elzma_compress_handle handle = NULL; + handle = elzma_compress_alloc(); + FC_ASSERT( handle != NULL ); + + struct lzma_file_ctx ctx; + ctx.src_buf = ( const unsigned char* )src_file.data(); + ctx.src_len = src_file.size(); + ctx.dst_path = dst_path; + + auto rc = elzma_compress_config( handle, + ELZMA_LC_DEFAULT, + ELZMA_LP_DEFAULT, + ELZMA_PB_DEFAULT, + level, + dict_size, + elzma_file_format::ELZMA_lzma, + ctx.src_len ); + + try + { + FC_ASSERT( rc == ELZMA_E_OK ); + } + catch( ... ) + { + elzma_compress_free( &handle ); + throw; + } + + rc = elzma_compress_run( handle, + lzma_file_input_callback, + ( void * )&ctx, + lzma_file_output_callback, + ( void * )&ctx, + NULL, + NULL ); + + elzma_compress_free( &handle ); + FC_ASSERT( rc == ELZMA_E_OK ); +} + +void lzma_decompress_file( const path& src_path, + const path& dst_path ) +{ + FC_ASSERT( exists( src_path ) ); + FC_ASSERT( !exists( dst_path ) ); + + boost::iostreams::mapped_file_source src_file; + src_file.open( src_path.string() ); + FC_ASSERT( src_file.is_open() ); + + elzma_decompress_handle handle = NULL; + handle = elzma_decompress_alloc(); + FC_ASSERT( handle != NULL ); + + struct lzma_file_ctx ctx; + ctx.src_buf = ( const unsigned char* )src_file.data(); + ctx.src_len = src_file.size(); + ctx.dst_path = dst_path; + + auto rc = elzma_decompress_run( handle, + lzma_file_input_callback, + ( void * )&ctx, + lzma_file_output_callback, + ( void * )&ctx, + elzma_file_format::ELZMA_lzma ); + + elzma_decompress_free( &handle ); + FC_ASSERT( rc == ELZMA_E_OK ); +} + } // namespace fc diff --git a/src/crypto/aes.cpp b/src/crypto/aes.cpp index 8e51519..b115805 100644 --- a/src/crypto/aes.cpp +++ b/src/crypto/aes.cpp @@ -8,6 +8,18 @@ #include +#include +#include +#include +#ifndef OPENSSL_THREADS +# error "OpenSSL must be configured to support threads" +#endif +#include + +#if defined(_MSC_VER) +# include +#endif + namespace fc { struct aes_encoder::impl @@ -157,13 +169,13 @@ uint32_t aes_decoder::final_decode( char* plaintext ) /** example method from wiki.opensslfoundation.com */ -int aes_encrypt(unsigned char *plaintext, int plaintext_len, unsigned char *key, - unsigned char *iv, unsigned char *ciphertext) +unsigned aes_encrypt(unsigned char *plaintext, int plaintext_len, unsigned char *key, + unsigned char *iv, unsigned char *ciphertext) { evp_cipher_ctx ctx( EVP_CIPHER_CTX_new() ); int len = 0; - int ciphertext_len = 0; + unsigned ciphertext_len = 0; /* Create and initialise the context */ if(!ctx) @@ -206,12 +218,12 @@ int aes_encrypt(unsigned char *plaintext, int plaintext_len, unsigned char *key, return ciphertext_len; } -int aes_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, - unsigned char *iv, unsigned char *plaintext) +unsigned aes_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, + unsigned char *iv, unsigned char *plaintext) { evp_cipher_ctx ctx( EVP_CIPHER_CTX_new() ); int len = 0; - int plaintext_len = 0; + unsigned plaintext_len = 0; /* Create and initialise the context */ if(!ctx) @@ -255,12 +267,12 @@ int aes_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *ke return plaintext_len; } -int aes_cfb_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, - unsigned char *iv, unsigned char *plaintext) +unsigned aes_cfb_decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, + unsigned char *iv, unsigned char *plaintext) { evp_cipher_ctx ctx( EVP_CIPHER_CTX_new() ); int len = 0; - int plaintext_len = 0; + unsigned plaintext_len = 0; /* Create and initialise the context */ if(!ctx) @@ -308,8 +320,8 @@ std::vector aes_encrypt( const fc::sha512& key, const std::vector& p { std::vector cipher_text(plain_text.size()+16); auto cipher_len = aes_encrypt( (unsigned char*)plain_text.data(), plain_text.size(), - (unsigned char*)&key, ((unsigned char*)&key)+32, - (unsigned char*)cipher_text.data() ); + (unsigned char*)&key, ((unsigned char*)&key)+32, + (unsigned char*)cipher_text.data() ); FC_ASSERT( cipher_len <= cipher_text.size() ); cipher_text.resize(cipher_len); return cipher_text; @@ -365,4 +377,63 @@ std::vector aes_load( const fc::path& file, const fc::sha512& key ) return aes_decrypt( key, cipher ); } FC_RETHROW_EXCEPTIONS( warn, "", ("file",file) ) } +/* This stuff has to go somewhere, I guess this is as good a place as any... + OpenSSL isn't thread-safe unless you give it access to some mutexes, + so the CRYPTO_set_id_callback() function needs to be called before there's any + chance of OpenSSL being accessed from multiple threads. +*/ +struct openssl_thread_config +{ + static boost::mutex* openssl_mutexes; + static unsigned long get_thread_id(); + static void locking_callback(int mode, int type, const char *file, int line); + openssl_thread_config(); + ~openssl_thread_config(); +}; +openssl_thread_config openssl_thread_config_manager; + +boost::mutex* openssl_thread_config::openssl_mutexes = nullptr; + +unsigned long openssl_thread_config::get_thread_id() +{ +#ifdef _MSC_VER + return (unsigned long)::GetCurrentThreadId(); +#else + return (unsigned long)(&fc::thread::current()); // TODO: should expose boost thread id +#endif +} + +void openssl_thread_config::locking_callback(int mode, int type, const char *file, int line) +{ + if (mode & CRYPTO_LOCK) + openssl_mutexes[type].lock(); + else + openssl_mutexes[type].unlock(); +} + +// Warning: Things get complicated if third-party libraries also try to install their their own +// OpenSSL thread functions. Right now, we don't install our own handlers if another library has +// installed them before us which is a partial solution, but you'd really need to evaluate +// each library that does this to make sure they will play nice. +openssl_thread_config::openssl_thread_config() +{ + if (CRYPTO_get_id_callback() == NULL && + CRYPTO_get_locking_callback() == NULL) + { + openssl_mutexes = new boost::mutex[CRYPTO_num_locks()]; + CRYPTO_set_id_callback(&get_thread_id); + CRYPTO_set_locking_callback(&locking_callback); + } +} +openssl_thread_config::~openssl_thread_config() +{ + if (CRYPTO_get_id_callback() == &get_thread_id) + { + CRYPTO_set_id_callback(NULL); + CRYPTO_set_locking_callback(NULL); + delete[] openssl_mutexes; + openssl_mutexes = nullptr; + } +} + } // namespace fc diff --git a/src/crypto/elliptic.cpp b/src/crypto/elliptic.cpp index dd3c4cb..ed38ad2 100644 --- a/src/crypto/elliptic.cpp +++ b/src/crypto/elliptic.cpp @@ -265,7 +265,7 @@ namespace fc { namespace ecc { std::string public_key::to_base58() const { public_key_data key = serialize(); - uint32_t check = sha256::hash(key.data, sizeof(key))._hash[0]; + uint32_t check = (uint32_t)sha256::hash(key.data, sizeof(key))._hash[0]; assert(key.size() + sizeof(check) == 37); array data; memcpy(data.data, key.begin(), key.size()); @@ -280,7 +280,7 @@ namespace fc { namespace ecc { FC_ASSERT( s == sizeof(data) ); public_key_data key; - uint32_t check = sha256::hash(data.data, sizeof(key))._hash[0]; + uint32_t check = (uint32_t)sha256::hash(data.data, sizeof(key))._hash[0]; FC_ASSERT( memcmp( (char*)&check, data.data + sizeof(key), sizeof(check) ) == 0 ); memcpy( (char*)key.data, data.data, sizeof(key) ); return public_key(key); diff --git a/src/crypto/romix.cpp b/src/crypto/romix.cpp index 81c3477..f9b85a1 100644 --- a/src/crypto/romix.cpp +++ b/src/crypto/romix.cpp @@ -13,7 +13,7 @@ namespace fc { - romix::romix( u_int32_t memReqts, u_int32_t numIter, std::string salt ) : + romix::romix( uint32_t memReqts, uint32_t numIter, std::string salt ) : hashOutputBytes_( 64 ), kdfOutputBytes_( 32 ) { @@ -32,7 +32,7 @@ namespace fc // Prepare the lookup table char *lookupTable_ = new char[memoryReqtBytes_]; - u_int32_t const HSZ = hashOutputBytes_; + uint32_t const HSZ = hashOutputBytes_; // First hash to seed the lookup table, input is variable length anyway fc::sha512 hash = sha512.hash(saltedPassword); @@ -40,7 +40,7 @@ namespace fc // Compute consecutive hashes of the passphrase // Every iteration is stored in the next 64-bytes in the Lookup table - for( u_int32_t nByte = 0; nByte < memoryReqtBytes_ - HSZ; nByte += HSZ ) + for( uint32_t nByte = 0; nByte < memoryReqtBytes_ - HSZ; nByte += HSZ ) { // Compute hash of slot i, put result in slot i+1 fc::sha512 hash = sha512.hash(lookupTable_ + nByte, HSZ); @@ -54,11 +54,11 @@ namespace fc // We "integerize" a hash value by taking the last 4 bytes of // as a u_int32_t, and take modulo sequenceCount - u_int64_t* X64ptr = (u_int64_t*)(X.data()); - u_int64_t* Y64ptr = (u_int64_t*)(Y.data()); - u_int64_t* V64ptr = NULL; - u_int32_t newIndex; - u_int32_t const nXorOps = HSZ / sizeof(u_int64_t); + uint64_t* X64ptr = (uint64_t*)(X.data()); + uint64_t* Y64ptr = (uint64_t*)(Y.data()); + uint64_t* V64ptr = NULL; + uint32_t newIndex; + uint32_t const nXorOps = HSZ / sizeof(uint64_t); // Pure ROMix would use sequenceCount_ for the number of lookups. // We divide by 2 to reduce computation time RELATIVE to the memory usage @@ -66,17 +66,17 @@ namespace fc // memory in the same amount of time (and this is the justification for // the scrypt algorithm -- it is basically ROMix, modified for more // flexibility in controlling compute-time vs memory-usage). - u_int32_t const nLookups = sequenceCount_ / 2; - for(u_int32_t nSeq=0; nSeq - V64ptr = (u_int64_t*)(lookupTable_ + HSZ * newIndex); + V64ptr = (uint64_t*)(lookupTable_ + HSZ * newIndex); // xor X with V, and store the result in X - for(u_int32_t i = 0; i < nXorOps; i++) + for(uint32_t i = 0; i < nXorOps; i++) *(Y64ptr + i) = *(X64ptr + i) ^ *(V64ptr + i); // Hash the xor'd data to get the next index for lookup @@ -91,7 +91,7 @@ namespace fc std::string romix::deriveKey( std::string const & password ) { std::string masterKey(password); - for(u_int32_t i=0; i - -#include +#include #include - -#include - -#define SCRYPT_SALSA 1 -#define SCRYPT_SHA256 1 - -/* -#include "code/scrypt-jane-portable.h" -#include "code/scrypt-jane-romix.h" -*/ +#include "scrypt-jane.h" namespace fc { - void scrypt_derive_key( const std::vector &passphrase, const std::vector &salt, - unsigned int n, unsigned int r, unsigned int p, std::vector &key ) - { - /* - unsigned int chunk_bytes = SCRYPT_BLOCK_BYTES * r * 2; - std::vector yx((p+1) * chunk_bytes); + unsigned log2( unsigned n ) + { + if( n <= 0 ) FC_THROW_EXCEPTION( exception, "cannot take log2(${n})", ("n",n) ); + unsigned i = 0; + while( n >>= 1 ) ++i; + return i; + } - unsigned char *Y = &yx[0]; - unsigned char *X = &yx[chunk_bytes]; - - if(PKCS5_PBKDF2_HMAC( (const char*)&passphrase[0], passphrase.size(), - &salt[0], salt.size(), 1, - EVP_sha256(), chunk_bytes * p, X) != 1 ) - { - std::fill( yx.begin(), yx.end(), 0 ); - FC_THROW_EXCEPTION( exception, "error generating key material", - ("s", ERR_error_string( ERR_get_error(), nullptr) ) ); - } - - std::vector v(n * chunk_bytes); - - for( unsigned int i = 0; i < p; i++ ) - scrypt_ROMix_basic( (uint32_t*)(X+(chunk_bytes*i)), (uint32_t*)Y, (uint32_t*)&v[0], n, r ); - - if(PKCS5_PBKDF2_HMAC( (const char*)&passphrase[0], passphrase.size(), - X, chunk_bytes * p, 1, - EVP_sha256(), key.size(), &key[0]) != 1 ) - { - std::fill( yx.begin(), yx.end(), 0 ); - std::fill( v.begin(), v.end(), 0 ); - FC_THROW_EXCEPTION( exception, "error generating key material", - ("s", ERR_error_string( ERR_get_error(), nullptr) ) ); - } - - std::fill( yx.begin(), yx.end(), 0 ); - std::fill( v.begin(), v.end(), 0 ); - */ - } + void scrypt_derive_key( const std::vector& passphrase, const std::vector& salt, + unsigned int n, unsigned int r, unsigned int p, std::vector& key ) + { + scrypt( passphrase.data(), passphrase.size(), salt.data(), salt.size(), + log2( n ) - 1, log2( r ), log2( p ), key.data(), key.capacity() ); + } } // namespace fc diff --git a/src/exception.cpp b/src/exception.cpp index 186317c..8988cde 100644 --- a/src/exception.cpp +++ b/src/exception.cpp @@ -157,7 +157,7 @@ namespace fc string exception::to_string( log_level ll )const { fc::stringstream ss; - ss << what() << "(" << variant(my->_code).as_string() <<")\n"; + ss << what() << " (" << variant(my->_code).as_string() <<")\n"; for( auto itr = my->_elog.begin(); itr != my->_elog.end(); ++itr ) { ss << fc::format_string( itr->get_format(), itr->get_data() ) <<"\n"; diff --git a/src/interprocess/process.cpp b/src/interprocess/process.cpp index 5504138..026c120 100644 --- a/src/interprocess/process.cpp +++ b/src/interprocess/process.cpp @@ -74,7 +74,7 @@ process::~process(){} iprocess& process::exec( const fc::path& exe, std::vector args, - const fc::path& work_dir, exec_opts opt ) + const fc::path& work_dir, int opt ) { my->pctx.work_dir = work_dir.string(); diff --git a/src/log/file_appender.cpp b/src/log/file_appender.cpp index 7521d26..686ed65 100644 --- a/src/log/file_appender.cpp +++ b/src/log/file_appender.cpp @@ -1,36 +1,181 @@ -#include -#include -#include -#include +#include +#include #include -#include +#include #include +#include +#include +#include +#include +#include #include +#include #include -#include - namespace fc { + + static const string compression_extension( ".lzma" ); + class file_appender::impl : public fc::retainable { public: - config cfg; - ofstream out; - boost::mutex slock; + config cfg; + ofstream out; + boost::mutex slock; + + private: + future _rotation_task; + time_point_sec _current_file_start_time; + std::unique_ptr _compression_thread; + + time_point_sec get_file_start_time( const time_point_sec& timestamp, const microseconds& interval ) + { + const auto interval_seconds = interval.to_seconds(); + const auto file_number = timestamp.sec_since_epoch() / interval_seconds; + return time_point_sec( file_number * interval_seconds ); + } + + string timestamp_to_string( const time_point_sec& timestamp ) + { + auto ptime = boost::posix_time::from_time_t( time_t ( timestamp.sec_since_epoch() ) ); + return boost::posix_time::to_iso_string( ptime ); + } + + time_point_sec string_to_timestamp( const string& str ) + { + return time_point::from_iso_string( str ); + } + + void compress_file( const string& filename ) + { + FC_ASSERT( cfg.rotate && cfg.rotation_compression ); + FC_ASSERT( _compression_thread ); + if( !_compression_thread->is_current() ) + { + _compression_thread->async( [this, filename]() { compress_file( filename ); } ).wait(); + return; + } + + try + { + lzma_compress_file( filename, filename + compression_extension ); + remove_all( filename ); + } + catch( ... ) + { + } + } + + public: + impl( const config& c) : cfg( c ) + { + if( cfg.rotate ) + { + FC_ASSERT( cfg.rotation_interval >= seconds( 1 ) ); + FC_ASSERT( cfg.rotation_limit >= cfg.rotation_interval ); + + if( cfg.rotation_compression ) + _compression_thread.reset( new thread( "compression") ); + + _rotation_task = async( [this]() { rotate_files( true ); } ); + } + } + + ~impl() + { + try + { + _rotation_task.cancel_and_wait(); + if( _compression_thread ) _compression_thread->quit(); + } + catch( ... ) + { + } + } + + void rotate_files( bool initializing = false ) + { + FC_ASSERT( cfg.rotate ); + const auto now = time_point::now(); + const auto start_time = get_file_start_time( now, cfg.rotation_interval ); + const auto timestamp_string = timestamp_to_string( start_time ); + const auto link_filename = cfg.filename.string(); + const auto log_filename = link_filename + "." + timestamp_string; + + { + fc::scoped_lock lock( slock ); + + if( !initializing ) + { + if( start_time <= _current_file_start_time ) + { + _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds() ); + return; + } + + out.flush(); + out.close(); + } + + out.open( log_filename.c_str() ); + } + remove_all( link_filename ); + create_hard_link( log_filename, link_filename ); + + /* Delete old log files */ + const auto limit_time = now - cfg.rotation_limit; + auto itr = directory_iterator( fc::path( link_filename ).parent_path() ); + for( ; itr != directory_iterator(); itr++ ) + { + try + { + const auto current_filename = itr->string(); + auto current_pos = current_filename.find( link_filename ); + if( current_pos != 0 ) continue; + current_pos = link_filename.size() + 1; + const auto current_timestamp_str = string( current_filename.begin() + current_pos, /* substr not working */ + current_filename.begin() + current_pos + timestamp_string.size() ); + const auto current_timestamp = string_to_timestamp( current_timestamp_str ); + if( current_timestamp < start_time ) + { + if( current_timestamp < limit_time || file_size( current_filename ) <= 0 ) + { + remove_all( current_filename ); + continue; + } + + if( !cfg.rotation_compression ) continue; + if( current_filename.find( compression_extension ) != string::npos ) continue; + compress_file( current_filename ); + } + } + catch( ... ) + { + } + } + + _current_file_start_time = start_time; + _rotation_task = schedule( [this]() { rotate_files(); }, _current_file_start_time + cfg.rotation_interval.to_seconds() ); + } }; file_appender::config::config( const fc::path& p ) :format( "${timestamp} ${thread_name} ${context} ${file}:${line} ${method} ${level}] ${message}" ), - filename(p),flush(true),truncate(true){} + filename(p),flush(true),truncate(true),rotate(false),rotation_compression(true){} file_appender::file_appender( const variant& args ) - :my( new impl() ) + :my( new impl( args.as() ) ) { - try { - my->cfg = args.as(); - fc::create_directories( fc::path( my->cfg.filename.string() ).parent_path() ); - my->out.open( my->cfg.filename.string().c_str() ); - } catch ( ... ) { - std::cerr << "error opening log file: " << my->cfg.filename.string() << "\n"; - //elog( "%s", fc::except_str().c_str() ); + std::string log_filename; + try + { + log_filename = my->cfg.filename.string(); + + fc::create_directories( fc::path( log_filename ).parent_path() ); + + if( !my->cfg.rotate ) my->out.open( log_filename.c_str() ); + } + catch( ... ) + { + std::cerr << "error opening log file: " << log_filename << "\n"; } } file_appender::~file_appender(){} @@ -60,14 +205,15 @@ namespace fc { fc::string message = fc::format_string( m.get_format(), m.get_data() ); line << message.c_str(); - //fc::variant lmsg(m); - // fc::string fmt_str = fc::format_string( my->cfg.format, mutable_variant_object(m.get_context())( "message", message) ); + // fc::string fmt_str = fc::format_string( my->cfg.format, mutable_variant_object(m.get_context())( "message", message) ); + { - fc::scoped_lock lock(my->slock); + fc::scoped_lock lock( my->slock ); my->out << line.str() << "\t\t\t" << m.get_context().get_file() <<":"<cfg.flush ) my->out.flush(); } - if( my->cfg.flush ) my->out.flush(); } -} + +} // fc diff --git a/src/network/ntp.cpp b/src/network/ntp.cpp index a764448..c938edf 100644 --- a/src/network/ntp.cpp +++ b/src/network/ntp.cpp @@ -16,20 +16,19 @@ namespace fc class ntp_impl { public: - ntp_impl() :_request_interval_sec( 60*60 /* 1 hr */) + ntp_impl():_request_interval_sec( 60*60 /* 1 hr */),_ntp_thread("ntp") { - _next_request_time = fc::time_point::now(); _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) ); } /** vector < host, port > */ std::vector< std::pair< std::string, uint16_t> > _ntp_hosts; - fc::future _request_loop; fc::future _read_loop; udp_socket _sock; uint32_t _request_interval_sec; - fc::time_point _next_request_time; + fc::time_point _last_request_time; optional _last_ntp_delta; + fc::thread _ntp_thread; void request_now() { @@ -43,6 +42,7 @@ namespace fc { ilog( "sending request to ${ep}", ("ep",ep) ); std::array send_buf { {010,0,0,0,0,0,0,0,0} }; + _last_request_time = fc::time_point::now(); _sock.send_to( (const char*)send_buf.data(), send_buf.size(), ep ); break; } @@ -55,17 +55,10 @@ namespace fc } } // request_now - void request_loop() + void request_time() { - while( !_request_loop.canceled() ) - { - if( _next_request_time < fc::time_point::now() ) - { - _next_request_time += fc::seconds( _request_interval_sec ); - request_now(); - } - fc::usleep( fc::seconds(1) ); // TODO: fix FC timers.. - } // while + request_now(); + _ntp_thread.schedule( [=](){ request_time(); }, fc::time_point::now() + fc::seconds(_request_interval_sec) ); } // request_loop void read_loop() @@ -86,14 +79,19 @@ namespace fc uint32_t seconds_since_1900 = receive_timestamp_host >> 32; uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800; - auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds)); - if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && - fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) - { - _last_ntp_delta = ntp_time - fc::time_point::now(); - } + if( fc::time_point::now() - _last_request_time > fc::seconds(1) ) + request_now(); else - elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); + { + auto ntp_time = (fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds)); + if( ntp_time - fc::time_point::now() < fc::seconds(60*60*24) && + fc::time_point::now() - ntp_time < fc::seconds(60*60*24) ) + { + _last_ntp_delta = ntp_time - fc::time_point::now(); + } + else + elog( "NTP time is way off ${time}", ("time",ntp_time)("local",fc::time_point::now()) ); + } } } // read_loop }; @@ -108,17 +106,15 @@ namespace fc { my->_sock.open(); - my->_request_loop = fc::async( [=](){ my->request_loop(); } ); - my->_read_loop = fc::async( [=](){ my->read_loop(); } ); + my->_ntp_thread.async( [=](){ my->request_time(); } ); + my->_read_loop = my->_ntp_thread.async( [=](){ my->read_loop(); } ); } ntp::~ntp() { try { - my->_request_loop.cancel(); my->_read_loop.cancel(); my->_sock.close(); - my->_request_loop.wait(); my->_read_loop.wait(); } catch ( const fc::exception& ) @@ -137,8 +133,8 @@ namespace fc void ntp::set_request_interval( uint32_t interval_sec ) { my->_request_interval_sec = interval_sec; - my->_next_request_time = fc::time_point::now(); } + void ntp::request_now() { my->request_now(); diff --git a/src/network/tcp_socket.cpp b/src/network/tcp_socket.cpp index 2a53c14..184faaa 100644 --- a/src/network/tcp_socket.cpp +++ b/src/network/tcp_socket.cpp @@ -23,21 +23,25 @@ namespace fc { { if( _sock.is_open() ) _sock.close(); + if( _read_in_progress.valid() ) try { _read_in_progress.wait(); } catch ( ... ) {} + if( _write_in_progress.valid() ) try { _write_in_progress.wait(); } catch ( ... ) {} } virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override; virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override; + fc::future _write_in_progress; + fc::future _read_in_progress; boost::asio::ip::tcp::socket _sock; tcp_socket_io_hooks* _io_hooks; }; size_t tcp_socket::impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) { - return fc::asio::read_some(socket, boost::asio::buffer(buffer, length)); + return (_read_in_progress = fc::asio::read_some(socket, boost::asio::buffer(buffer, length))).wait(); } size_t tcp_socket::impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) { - return fc::asio::write_some(socket, boost::asio::buffer(buffer, length)); + return (_write_in_progress = fc::asio::write_some(socket, boost::asio::buffer(buffer, length))).wait(); } diff --git a/src/network/udt_socket.cpp b/src/network/udt_socket.cpp new file mode 100644 index 0000000..a065aac --- /dev/null +++ b/src/network/udt_socket.cpp @@ -0,0 +1,395 @@ +#include +#include +#include +#include +#include +#include + +#ifndef WIN32 +# include +#endif + +namespace fc { + + void check_udt_errors() + { + UDT::ERRORINFO& error_info = UDT::getlasterror(); + if( error_info.getErrorCode() ) + { + std::string error_message = error_info.getErrorMessage(); + error_info.clear(); + FC_CAPTURE_AND_THROW( udt_exception, (error_message) ); + } + } + + class udt_epoll_service + { + public: + udt_epoll_service() + :_epoll_thread("udt_epoll") + { + UDT::startup(); + check_udt_errors(); + _epoll_id = UDT::epoll_create(); + _epoll_loop = _epoll_thread.async( [=](){ poll_loop(); } ); + } + + ~udt_epoll_service() + { + _epoll_loop.cancel(); + _epoll_loop.wait(); + UDT::cleanup(); + } + + void poll_loop() + { + std::set read_ready; + std::set write_ready; + while( !_epoll_loop.canceled() ) + { + UDT::epoll_wait( _epoll_id, + &read_ready, + &write_ready, 100000000 ); + + { synchronized(_read_promises_mutex) + for( auto sock : read_ready ) + { + auto itr = _read_promises.find( sock ); + if( itr != _read_promises.end() ) + { + itr->second->set_value(); + _read_promises.erase(itr); + } + } + } // synchronized read promise mutex + + { synchronized(_write_promises_mutex) + for( auto sock : write_ready ) + { + auto itr = _write_promises.find( sock ); + if( itr != _write_promises.end() ) + { + itr->second->set_value(); + _write_promises.erase(itr); + } + } + } // synchronized write promise mutex + } // while not canceled + } // poll_loop + + + void notify_read( int udt_socket_id, + const promise::ptr& p ) + { + int events = UDT_EPOLL_IN | UDT_EPOLL_ERR; + if( 0 != UDT::epoll_add_usock( _epoll_id, + udt_socket_id, + &events ) ) + { + check_udt_errors(); + } + { synchronized(_read_promises_mutex) + + _read_promises[udt_socket_id] = p; + } + } + + void notify_write( int udt_socket_id, + const promise::ptr& p ) + { + int events = UDT_EPOLL_OUT | UDT_EPOLL_ERR; + if( 0 != UDT::epoll_add_usock( _epoll_id, + udt_socket_id, + &events ) ) + { + check_udt_errors(); + } + + { synchronized(_write_promises_mutex) + _write_promises[udt_socket_id] = p; + } + } + void remove( int udt_socket_id ) + { + { synchronized(_read_promises_mutex) + auto read_itr = _read_promises.find( udt_socket_id ); + if( read_itr != _read_promises.end() ) + { + read_itr->second->set_exception( fc::copy_exception( fc::canceled_exception() ) ); + _read_promises.erase(read_itr); + } + } + { synchronized(_write_promises_mutex) + auto write_itr = _write_promises.find( udt_socket_id ); + if( write_itr != _write_promises.end() ) + { + write_itr->second->set_exception( fc::copy_exception( fc::canceled_exception() ) ); + _write_promises.erase(write_itr); + } + } + UDT::epoll_remove_usock( _epoll_id, udt_socket_id ); + } + + private: + fc::mutex _read_promises_mutex; + fc::mutex _write_promises_mutex; + std::unordered_map::ptr > _read_promises; + std::unordered_map::ptr > _write_promises; + + fc::future _epoll_loop; + fc::thread _epoll_thread; + int _epoll_id; + }; + + + udt_epoll_service& default_epool_service() + { + static udt_epoll_service* default_service = new udt_epoll_service(); + return *default_service; + } + + + + udt_socket::udt_socket() + :_udt_socket_id( UDT::INVALID_SOCK ) + { + } + + udt_socket::~udt_socket() + { + try { + close(); + } catch ( const fc::exception& e ) + { + wlog( "${e}", ("e", e.to_detail_string() ) ); + } + } + + void udt_socket::bind( const fc::ip::endpoint& local_endpoint ) + { try { + if( !is_open() ) + open(); + + sockaddr_in local_addr; + local_addr.sin_family = AF_INET; + local_addr.sin_port = htons(local_endpoint.port()); + local_addr.sin_addr.s_addr = htonl(local_endpoint.get_address()); + + if( UDT::ERROR == UDT::bind(_udt_socket_id, (sockaddr*)&local_addr, sizeof(local_addr)) ) + check_udt_errors(); + } FC_CAPTURE_AND_RETHROW() } + + void udt_socket::connect_to( const ip::endpoint& remote_endpoint ) + { try { + if( !is_open() ) + open(); + + sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(remote_endpoint.port()); + serv_addr.sin_addr.s_addr = htonl(remote_endpoint.get_address()); + + // UDT doesn't allow now blocking connects... + fc::thread connect_thread("connect_thread"); + connect_thread.async( [&](){ + if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) ) + check_udt_errors(); + }).wait(); + + bool block = false; + UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool)); + UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool)); + check_udt_errors(); + + } FC_CAPTURE_AND_RETHROW( (remote_endpoint) ) } + + ip::endpoint udt_socket::remote_endpoint() const + { try { + sockaddr_in peer_addr; + int peer_addr_size = sizeof(peer_addr); + int error_code = UDT::getpeername( _udt_socket_id, (struct sockaddr*)&peer_addr, &peer_addr_size ); + if( error_code == UDT::ERROR ) + check_udt_errors(); + return ip::endpoint( ip::address( htonl( peer_addr.sin_addr.s_addr ) ), htons(peer_addr.sin_port) ); + } FC_CAPTURE_AND_RETHROW() } + + ip::endpoint udt_socket::local_endpoint() const + { try { + sockaddr_in sock_addr; + int addr_size = sizeof(sock_addr); + int error_code = UDT::getsockname( _udt_socket_id, (struct sockaddr*)&sock_addr, &addr_size ); + if( error_code == UDT::ERROR ) + check_udt_errors(); + return ip::endpoint( ip::address( htonl( sock_addr.sin_addr.s_addr ) ), htons(sock_addr.sin_port) ); + } FC_CAPTURE_AND_RETHROW() } + + + /// @{ + size_t udt_socket::readsome( char* buffer, size_t max ) + { try { + auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 ); + while( bytes_read == UDT::ERROR ) + { + if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV ) + { + UDT::getlasterror().clear(); + promise::ptr p(new promise("udt_socket::readsome")); + default_epool_service().notify_read( _udt_socket_id, p ); + p->wait(); + bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 ); + } + else + check_udt_errors(); + } + return bytes_read; + } FC_CAPTURE_AND_RETHROW( (max) ) } + + bool udt_socket::eof()const + { + // TODO... + return false; + } + /// @} + + /// ostream interface + /// @{ + size_t udt_socket::writesome( const char* buffer, size_t len ) + { try { + auto bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0); + + while( UDT::ERROR == bytes_sent ) + { + if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCSND ) + { + UDT::getlasterror().clear(); + promise::ptr p(new promise("udt_socket::writesome")); + default_epool_service().notify_write( _udt_socket_id, p ); + p->wait(); + bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0); + continue; + } + else + check_udt_errors(); + } + return bytes_sent; + } FC_CAPTURE_AND_RETHROW( (len) ) } + + void udt_socket::flush(){} + + void udt_socket::close() + { try { + if( is_open() ) + { + default_epool_service().remove( _udt_socket_id ); + UDT::close( _udt_socket_id ); + check_udt_errors(); + _udt_socket_id = UDT::INVALID_SOCK; + } + else + { + wlog( "already closed" ); + } + } FC_CAPTURE_AND_RETHROW() } + /// @} + + void udt_socket::open() + { + _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0); + if( _udt_socket_id == UDT::INVALID_SOCK ) + check_udt_errors(); + } + + bool udt_socket::is_open()const + { + return _udt_socket_id != UDT::INVALID_SOCK; + } + + + + + + + udt_server::udt_server() + :_udt_socket_id( UDT::INVALID_SOCK ) + { + _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0); + if( _udt_socket_id == UDT::INVALID_SOCK ) + check_udt_errors(); + + bool block = false; + UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool)); + check_udt_errors(); + UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool)); + check_udt_errors(); + } + + udt_server::~udt_server() + { + try { + close(); + } catch ( const fc::exception& e ) + { + wlog( "${e}", ("e", e.to_detail_string() ) ); + } + } + + void udt_server::close() + { try { + if( _udt_socket_id != UDT::INVALID_SOCK ) + { + UDT::close( _udt_socket_id ); + check_udt_errors(); + default_epool_service().remove( _udt_socket_id ); + _udt_socket_id = UDT::INVALID_SOCK; + } + } FC_CAPTURE_AND_RETHROW() } + + void udt_server::accept( udt_socket& s ) + { try { + FC_ASSERT( !s.is_open() ); + int namelen; + sockaddr_in their_addr; + + + while( s._udt_socket_id == UDT::INVALID_SOCK ) + { + s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen ); + if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV ) + { + UDT::getlasterror().clear(); + promise::ptr p(new promise("udt_server::accept")); + default_epool_service().notify_read( _udt_socket_id, p ); + p->wait(); + s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen ); + } + else + check_udt_errors(); + } + } FC_CAPTURE_AND_RETHROW() } + + void udt_server::listen( const ip::endpoint& ep ) + { try { + sockaddr_in my_addr; + my_addr.sin_family = AF_INET; + my_addr.sin_port = htons(ep.port()); + my_addr.sin_addr.s_addr = INADDR_ANY; + memset(&(my_addr.sin_zero), '\0', 8); + + if( UDT::ERROR == UDT::bind(_udt_socket_id, (sockaddr*)&my_addr, sizeof(my_addr)) ) + check_udt_errors(); + + UDT::listen(_udt_socket_id, 10); + check_udt_errors(); + } FC_CAPTURE_AND_RETHROW( (ep) ) } + + fc::ip::endpoint udt_server::local_endpoint() const + { try { + sockaddr_in sock_addr; + int addr_size = sizeof(sock_addr); + int error_code = UDT::getsockname( _udt_socket_id, (struct sockaddr*)&sock_addr, &addr_size ); + if( error_code == UDT::ERROR ) + check_udt_errors(); + return ip::endpoint( ip::address( htonl( sock_addr.sin_addr.s_addr ) ), htons(sock_addr.sin_port) ); + } FC_CAPTURE_AND_RETHROW() } + +} diff --git a/src/rpc/json_connection.cpp b/src/rpc/json_connection.cpp index 226d145..fa01fd0 100644 --- a/src/rpc/json_connection.cpp +++ b/src/rpc/json_connection.cpp @@ -518,6 +518,42 @@ namespace fc { namespace rpc { my->_out->flush(); return my->_awaiting[id]; } + future json_connection::async_call( const fc::string& method, + const variant& a1, + const variant& a2, + const variant& a3, + const variant& a4, const variant& a5, const variant& a6, const variant& a7, const variant& a8 ) + { + auto id = my->_next_id++; + my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); + + { + fc::scoped_lock lock(my->_write_mutex); + *my->_out << "{\"id\":"; + *my->_out << id; + *my->_out << ",\"method\":"; + json::to_stream( *my->_out, method ); + *my->_out << ",\"params\":["; + fc::json::to_stream( *my->_out, a1 ); + *my->_out << ","; + fc::json::to_stream( *my->_out, a2 ); + *my->_out << ","; + fc::json::to_stream( *my->_out, a3 ); + *my->_out << ","; + fc::json::to_stream( *my->_out, a4 ); + *my->_out << ","; + fc::json::to_stream( *my->_out, a5 ); + *my->_out << ","; + fc::json::to_stream( *my->_out, a6 ); + *my->_out << ","; + fc::json::to_stream( *my->_out, a7 ); + *my->_out << ","; + fc::json::to_stream( *my->_out, a8 ); + *my->_out << "]}\n"; + } + my->_out->flush(); + return my->_awaiting[id]; + } future json_connection::async_call( const fc::string& method, mutable_variant_object named_args ) { diff --git a/src/thread/context.hpp b/src/thread/context.hpp index 5f7f8d6..35d83d7 100644 --- a/src/thread/context.hpp +++ b/src/thread/context.hpp @@ -53,15 +53,15 @@ namespace fc { { #if BOOST_VERSION >= 105400 bco::stack_context stack_ctx; - size_t stack_size = bco::stack_allocator::default_stacksize() * 8; + size_t stack_size = bco::stack_allocator::default_stacksize() * 4; alloc.allocate(stack_ctx, stack_size); my_context = bc::make_fcontext( stack_ctx.sp, stack_ctx.size, sf); #elif BOOST_VERSION >= 105300 - size_t stack_size = bco::stack_allocator::default_stacksize(); + size_t stack_size = bco::stack_allocator::default_stacksize() * 4; void* stackptr = alloc.allocate(stack_size); my_context = bc::make_fcontext( stackptr, stack_size, sf); #else - size_t stack_size = bc::default_stacksize(); + size_t stack_size = bc::default_stacksize() * 4; my_context.fc_stack.base = alloc.allocate( stack_size ); my_context.fc_stack.limit = static_cast( my_context.fc_stack.base) - stack_size; make_fcontext( &my_context, sf ); @@ -91,13 +91,13 @@ namespace fc { delete my_context; #elif BOOST_VERSION >= 105400 if(stack_alloc) - stack_alloc->deallocate( my_context->fc_stack.sp, bco::stack_allocator::default_stacksize() ); + stack_alloc->deallocate( my_context->fc_stack.sp, bco::stack_allocator::default_stacksize() * 4 ); else delete my_context; #elif BOOST_VERSION >= 105300 if(stack_alloc) - stack_alloc->deallocate( my_context->fc_stack.sp, bco::stack_allocator::default_stacksize() ); + stack_alloc->deallocate( my_context->fc_stack.sp, bco::stack_allocator::default_stacksize() * 4); else delete my_context; #else diff --git a/src/thread/task.cpp b/src/thread/task.cpp index 98edbd3..c3e1feb 100644 --- a/src/thread/task.cpp +++ b/src/thread/task.cpp @@ -35,7 +35,10 @@ namespace fc { } void task_base::run_impl() { try { - _run_functor( _functor, _promise_impl ); + if( !canceled() ) + _run_functor( _functor, _promise_impl ); + else + FC_THROW_EXCEPTION( canceled_exception, "${description}", ("description", get_desc() ) ); } catch ( const exception& e ) { diff --git a/src/thread/thread.cpp b/src/thread/thread.cpp index 3bb7444..6968417 100644 --- a/src/thread/thread.cpp +++ b/src/thread/thread.cpp @@ -70,11 +70,11 @@ namespace fc { return t; } - thread::thread( const char* name ) { + thread::thread( const std::string& name ) { promise::ptr p(new promise()); boost::thread* t = new boost::thread( [this,p,name]() { try { - set_thread_name(name); // set thread's name for the debugger to display + set_thread_name(name.c_str()); // set thread's name for the debugger to display this->my = new thread_d(*this); current_thread() = this; p->set_value(); @@ -108,7 +108,7 @@ namespace fc { } thread::~thread() { - //slog( "my %p", my ); + //wlog( "my ${n}", ("n",name()) ); if( is_current() ) { wlog( "delete my" ); @@ -126,22 +126,20 @@ namespace fc { void thread::debug( const fc::string& d ) { /*my->debug(d);*/ } void thread::quit() { - //if quiting from a different thread, start quit task on thread. + //if quitting from a different thread, start quit task on thread. //If we have and know our attached boost thread, wait for it to finish, then return. if( ¤t() != this ) { async( [=](){quit();} );//.wait(); if( my->boost_thread ) { auto n = name(); - ilog( "joining... ${n}", ("n",n) );//n.c_str() ); my->boost_thread->join(); delete my; my = nullptr; - ilog( "done joining...${n}", ("n",n) ); //n.c_str() ); } return; } - // wlog( "%s", my->name.c_str() ); + wlog( "${s}", ("s",name()) ); // We are quiting from our own thread... // break all promises, thread quit! @@ -157,8 +155,8 @@ namespace fc { cur = n; } if( my->blocked ) { - // wlog( "still blocking... whats up with that?"); - // debug( "on quit" ); + wlog( "still blocking... whats up with that?"); + debug( "on quit" ); } } BOOST_ASSERT( my->blocked == 0 ); @@ -200,7 +198,7 @@ namespace fc { void thread::exec() { if( !my->current ) my->current = new fc::context(&fc::thread::current()); try { - my->process_tasks(); + my->process_tasks(); } catch( canceled_exception& ) { @@ -225,35 +223,15 @@ namespace fc { my->start_next_fiber(reschedule); my->check_fiber_exceptions(); } + void thread::sleep_until( const time_point& tp ) { - //ilog( "sleep until ${tp} wait: ${delta}", ("tp",tp)("delta",(tp-fc::time_point::now()).count()) ); - if( tp <= (time_point::now()+fc::microseconds(10000)) ) { this->yield(true); } my->yield_until( tp, false ); - /* - my->check_fiber_exceptions(); - - BOOST_ASSERT( ¤t() == this ); - if( !my->current ) { - my->current = new fc::context(&fc::thread::current()); - } - - my->current->resume_time = tp; - my->current->clear_blocking_promises(); - - my->sleep_pqueue.push_back(my->current); - std::push_heap( my->sleep_pqueue.begin(), - my->sleep_pqueue.end(), sleep_priority_less() ); - - my->start_next_fiber(); - my->current->resume_time = time_point::maximum(); - - my->check_fiber_exceptions(); - */ } + int thread::wait_any_until( std::vector&& p, const time_point& timeout) { for( size_t i = 0; i < p.size(); ++i ) { if( p[i]->ready() ) return i; diff --git a/src/thread/thread_d.hpp b/src/thread/thread_d.hpp index 6e26720..314e0a4 100644 --- a/src/thread/thread_d.hpp +++ b/src/thread/thread_d.hpp @@ -239,6 +239,30 @@ namespace fc { } return p; } + + bool process_canceled_tasks() + { + bool canceled_task = false; + for( auto task_itr = task_sch_queue.begin(); + task_itr != task_sch_queue.end(); + ) + { + if( (*task_itr)->canceled() ) + { + (*task_itr)->run(); + (*task_itr)->release(); + task_itr = task_sch_queue.erase(task_itr); + canceled_task = true; + continue; + } + ++task_itr; + } + + if( canceled_task ) + std::make_heap( task_sch_queue.begin(), task_sch_queue.end(), task_when_less() ); + + return canceled_task; + } /** * This should be before or after a context switch to @@ -341,6 +365,7 @@ namespace fc { bool run_next_task() { check_for_timeouts(); task_base* next = dequeue(); + if( next ) { next->_set_active_context( current ); current->cur_task = next; @@ -377,6 +402,8 @@ namespace fc { continue; } + if( process_canceled_tasks() ) continue; + clear_free_list(); { // lock scope @@ -388,6 +415,10 @@ namespace fc { if( timeout_time == time_point::maximum() ) { task_ready.wait( lock ); } else if( timeout_time != time_point::min() ) { + // there may be tasks that have been canceled we should filter them out now + // rather than waiting... + + /* This bit is kind of sloppy -- this wait was originally implemented as a wait * with respect to boost::chrono::system_clock. This behaved rather comically * if you were to do a: diff --git a/src/time.cpp b/src/time.cpp index 2b40f95..dc50544 100644 --- a/src/time.cpp +++ b/src/time.cpp @@ -92,7 +92,7 @@ namespace fc { return result.str(); } uint32_t years_ago = days_ago / 365; - result << years_ago << " year" << (months_ago > 1 ? "s" : ""); + result << years_ago << " year" << (months_ago > 1 ? "s " : " "); if (months_ago < 12 * 5) { uint32_t leftover_days = days_ago - (years_ago * 365); diff --git a/tests/lzma_test.cpp b/tests/lzma_test.cpp new file mode 100644 index 0000000..3457597 --- /dev/null +++ b/tests/lzma_test.cpp @@ -0,0 +1,24 @@ +#include +#include + +#include +#include + +using namespace fc; + +int main( int argc, char** argv ) +{ + if( argc != 2 ) + { + std::cout << "usage: " << argv[0] << " \n"; + exit( -1 ); + } + + auto src = std::string( argv[1] ); + auto dst = src + ".compressed"; + lzma_compress_file( src, dst ); + + lzma_decompress_file( dst, src + ".decompressed" ); + + return 0; +} diff --git a/tests/task_cancel.cpp b/tests/task_cancel.cpp new file mode 100644 index 0000000..e63544c --- /dev/null +++ b/tests/task_cancel.cpp @@ -0,0 +1,92 @@ +#define BOOST_TEST_MODULE fc_task_cancel_tests +#include + +#include +#include +#include + +BOOST_AUTO_TEST_CASE( cancel_an_active_task ) +{ + enum task_result{sleep_completed, sleep_aborted}; + fc::future task = fc::async([]() { + BOOST_TEST_MESSAGE("Starting async task"); + try + { + fc::usleep(fc::seconds(5)); + return sleep_completed; + } + catch (const fc::exception&) + { + return sleep_aborted; + } + }); + + fc::time_point start_time = fc::time_point::now(); + + // wait a bit for the task to start running + fc::usleep(fc::milliseconds(100)); + + BOOST_TEST_MESSAGE("Canceling task"); + task.cancel(); + try + { + task_result result = task.wait(); + BOOST_CHECK_MESSAGE(result != sleep_completed, "sleep should have been canceled"); + } + catch (fc::exception& e) + { + BOOST_TEST_MESSAGE("Caught exception from canceled task: " << e.what()); + BOOST_CHECK_MESSAGE(fc::time_point::now() - start_time < fc::seconds(4), "Task was not canceled quickly"); + } +} + + +BOOST_AUTO_TEST_CASE( cleanup_cancelled_task ) +{ + std::shared_ptr some_string(std::make_shared("some string")); + fc::future task = fc::async([some_string]() { + BOOST_TEST_MESSAGE("Starting async task, bound string is " << *some_string); + try + { + fc::usleep(fc::seconds(5)); + BOOST_TEST_MESSAGE("Finsihed usleep in async task, leaving the task's functor"); + } + catch (...) + { + BOOST_TEST_MESSAGE("Caught exception in async task, leaving the task's functor"); + } + }); + std::weak_ptr weak_string_ptr(some_string); + some_string.reset(); + BOOST_CHECK_MESSAGE(!weak_string_ptr.expired(), "Weak pointer should still be valid because async task should be holding the strong pointer"); + fc::usleep(fc::milliseconds(100)); + BOOST_TEST_MESSAGE("Canceling task"); + task.cancel(); + try + { + task.wait(); + } + catch (fc::exception& e) + { + BOOST_TEST_MESSAGE("Caught exception from canceled task: " << e.what()); + } + BOOST_CHECK_MESSAGE(weak_string_ptr.expired(), "Weak pointer should now be invalid because async task should be done with it"); + task = fc::future(); + BOOST_CHECK_MESSAGE(weak_string_ptr.expired(), "Weak pointer should now be invalid because async task should have been destroyed"); +} + +BOOST_AUTO_TEST_CASE( cancel_scheduled_task ) +{ + bool task_executed = false; + try + { + auto result = fc::schedule( [&]() { task_executed = true; }, fc::time_point::now() + fc::seconds(3) ); + result.cancel(); + result.wait(); + } + catch ( const fc::exception& e ) + { + wlog( "${e}", ("e",e.to_detail_string() ) ); + } + BOOST_CHECK(!task_executed); +} \ No newline at end of file diff --git a/tests/udt_client.cpp b/tests/udt_client.cpp new file mode 100644 index 0000000..f575b28 --- /dev/null +++ b/tests/udt_client.cpp @@ -0,0 +1,36 @@ +#include +#include +#include + +using namespace std; +using namespace UDT; + +int main() +{ + UDTSOCKET client = UDT::socket(AF_INET, SOCK_STREAM, 0); + + sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(9000); + inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr); + + memset(&(serv_addr.sin_zero), '\0', 8); + + // connect to the server, implict bind + if (UDT::ERROR == UDT::connect(client, (sockaddr*)&serv_addr, sizeof(serv_addr))) + { + cout << "connect: " << UDT::getlasterror().getErrorMessage(); + return 0; + } + + char* hello = "hello world! 3\n"; + if (UDT::ERROR == UDT::send(client, hello, strlen(hello) + 1, 0)) + { + cout << "send: " << UDT::getlasterror().getErrorMessage(); + return 0; + } + + UDT::close(client); + + return 1; +} diff --git a/tests/udt_server.cpp b/tests/udt_server.cpp new file mode 100644 index 0000000..ff7e2fb --- /dev/null +++ b/tests/udt_server.cpp @@ -0,0 +1,83 @@ +#include +#include +#include + +using namespace std; + +int main( int argc, char** argv ) +{ + UDTSOCKET serv = UDT::socket(AF_INET, SOCK_STREAM, 0); + bool block = false; + + sockaddr_in my_addr; + my_addr.sin_family = AF_INET; + my_addr.sin_port = htons(9000); + my_addr.sin_addr.s_addr = INADDR_ANY; + memset(&(my_addr.sin_zero), '\0', 8); + + if (UDT::ERROR == UDT::bind(serv, (sockaddr*)&my_addr, sizeof(my_addr))) + { + cout << "bind: " << UDT::getlasterror().getErrorMessage(); + return 0; + } + UDT::listen(serv, 10); + + int namelen; + sockaddr_in their_addr; + + + UDT::setsockopt(serv, 0, UDT_SNDSYN, &block, sizeof(bool)); + UDT::setsockopt(serv, 0, UDT_RCVSYN, &block, sizeof(bool)); + UDTSOCKET recver = UDT::accept(serv, (sockaddr*)&their_addr, &namelen); + if( recver == UDT::INVALID_SOCK ) + { + if( UDT::getlasterror_code() == CUDTException::EASYNCRCV ) + { + std::cout << "nothing yet... better luck next time\n"; + } + } + auto pollid = UDT::epoll_create(); + UDT::epoll_add_usock(pollid, serv, nullptr );// const int* events = NULL); + std::set readready; + std::set writeready; + std::cout << "waiting for 5 seconds\n"; + UDT::epoll_wait( pollid, &readready, &writeready, 10000 ); + + + recver = UDT::accept(serv, (sockaddr*)&their_addr, &namelen); + if( recver == UDT::INVALID_SOCK ) + { + if( UDT::getlasterror_code() == CUDTException::EASYNCRCV ) + { + std::cout << "nothing yet... better luck next time\n"; + } + return 0; + } + UDT::setsockopt(recver, 0, UDT_SNDSYN, &block, sizeof(bool)); + UDT::setsockopt(recver, 0, UDT_RCVSYN, &block, sizeof(bool)); + UDT::epoll_remove_usock(pollid, serv );// const int* events = NULL); + int events = UDT_EPOLL_IN; + + UDT::epoll_add_usock(pollid, recver, &events );// const int* events = NULL); + + readready.clear(); + UDT::epoll_wait( pollid, &readready, &writeready, 5000 ); + + char ip[16]; + cout << "new connection: " << inet_ntoa(their_addr.sin_addr) << ":" << ntohs(their_addr.sin_port) << endl; + + char data[100]; + + while (UDT::ERROR == UDT::recv(recver, data, 100, 0)) + { + cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl; + UDT::epoll_wait( pollid, &readready, &writeready, 5000 ); + } + + cout << data << endl; + + UDT::close(recver); + UDT::close(serv); + + return 1; +} diff --git a/tests/udtc.cpp b/tests/udtc.cpp new file mode 100644 index 0000000..aa48bb3 --- /dev/null +++ b/tests/udtc.cpp @@ -0,0 +1,49 @@ +#include +#include +#include +#include +#include +#include + +using namespace fc; + +int main( int argc, char** argv ) +{ + try { + udt_socket sock; + sock.bind( fc::ip::endpoint::from_string( "127.0.0.1:6666" ) ); + ilog( "." ); + sock.connect_to( fc::ip::endpoint::from_string( "127.0.0.1:7777" ) ); + ilog( "after connect to..." ); + + std::cout << "local endpoint: " < response; + response.resize(1024); + int r = sock.readsome( response.data(), response.size() ); + while( r ) + { + std::cout.write( response.data(), r ); + r = sock.readsome( response.data(), response.size() ); + } + */ + // if we exit too quickly, UDT will not have a chance to + // send the graceful close message. + //fc::usleep( fc::seconds(1) ); + } catch ( const fc::exception& e ) + { + elog( "${e}", ("e",e.to_detail_string() ) ); + } + + return 0; +} diff --git a/tests/udts.cpp b/tests/udts.cpp new file mode 100644 index 0000000..8b8d575 --- /dev/null +++ b/tests/udts.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include +#include + +using namespace fc; + +int main( int argc, char** argv ) +{ + try { + udt_server serv; + serv.listen( fc::ip::endpoint::from_string( "127.0.0.1:7777" ) ); + + while( true ) + { + udt_socket sock; + serv.accept( sock ); + + std::vector response; + response.resize(1024); + int r = sock.readsome( response.data(), response.size() ); + while( r ) + { + std::cout.write( response.data(), r ); + r = sock.readsome( response.data(), response.size() ); + //sock.write( response.data(), response.size() ); + } + + std::string goodbye = "goodbye cruel world"; + sock.write( goodbye.c_str(), goodbye.size() ); + } + } catch ( const fc::exception& e ) + { + elog( "${e}", ("e",e.to_detail_string() ) ); + } + + return 0; +} diff --git a/vendor/easylzma/src/CMakeLists.txt b/vendor/easylzma/src/CMakeLists.txt index decbcc9..8772e6d 100644 --- a/vendor/easylzma/src/CMakeLists.txt +++ b/vendor/easylzma/src/CMakeLists.txt @@ -52,8 +52,8 @@ ENDIF (APPLE) ### copy the two required headers into our output dir as a post build step # copy public headers to output directory -FOREACH (header ${PUB_HDRS}) +#FOREACH (header ${PUB_HDRS}) # preserve relative pathing - ADD_CUSTOM_COMMAND(TARGET easylzma_s POST_BUILD - COMMAND ${CMAKE_COMMAND} -E copy_if_different ${header} ${incDir}) -ENDFOREACH (header ${PUB_HDRS}) + #ADD_CUSTOM_COMMAND(TARGET easylzma_s POST_BUILD + #COMMAND ${CMAKE_COMMAND} -E copy_if_different ${header} ${incDir}) + #ENDFOREACH (header ${PUB_HDRS}) diff --git a/vendor/scrypt-jane/CMakeLists.txt b/vendor/scrypt-jane/CMakeLists.txt new file mode 100644 index 0000000..ac66338 --- /dev/null +++ b/vendor/scrypt-jane/CMakeLists.txt @@ -0,0 +1,12 @@ +if( "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang" ) + set( CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -no-integrated-as" ) +endif() + +add_definitions( -DSCRYPT_SALSA ) +add_definitions( -DSCRYPT_SHA256 ) + +set( scrypt_sources + scrypt-jane.c + ) + +add_library( scrypt ${scrypt_sources} ) diff --git a/vendor/scrypt-jane/scrypt-jane.h b/vendor/scrypt-jane/scrypt-jane.h index 1c0df62..bebad72 100644 --- a/vendor/scrypt-jane/scrypt-jane.h +++ b/vendor/scrypt-jane/scrypt-jane.h @@ -19,9 +19,15 @@ #include -typedef void (*scrypt_fatal_errorfn)(const char *msg); -void scrypt_set_fatal_error(scrypt_fatal_errorfn fn); +#ifdef __cplusplus +extern "C" { +#endif + typedef void (*scrypt_fatal_errorfn)(const char *msg); + void scrypt_set_fatal_error(scrypt_fatal_errorfn fn); -void scrypt(const unsigned char *password, size_t password_len, const unsigned char *salt, size_t salt_len, unsigned char Nfactor, unsigned char rfactor, unsigned char pfactor, unsigned char *out, size_t bytes); + void scrypt(const unsigned char *password, size_t password_len, const unsigned char *salt, size_t salt_len, unsigned char Nfactor, unsigned char rfactor, unsigned char pfactor, unsigned char *out, size_t bytes); +#ifdef __cplusplus +} +#endif #endif /* SCRYPT_JANE_H */