From 460da3480172c7157d7900f3c3c39312cdebcc59 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Sat, 8 Sep 2012 17:37:25 -0400 Subject: [PATCH] adding more features from cmt --- CMakeLists.txt | 4 + include/fc/any.hpp | 12 +-- include/fc/buffer.hpp | 8 +- include/fc/endpoint.hpp | 35 ++++++++ include/fc/filesystem.hpp | 35 +++++++- include/fc/ip.hpp | 39 +++++++++ include/fc/mutex.hpp | 110 ++++++++++++++++++++++++ include/fc/sha1.hpp | 10 ++- include/fc/thread.hpp | 2 + include/fc/unique_lock.hpp | 10 ++- src/context.hpp | 5 +- src/filesystem.cpp | 55 ++++++++++++ src/ip.cpp | 46 ++++++++++ src/mutex.cpp | 167 +++++++++++++++++++++++++++++++++++++ src/sha1.cpp | 68 +++++++++------ src/thread.cpp | 3 + src/thread_d.hpp | 7 ++ 17 files changed, 569 insertions(+), 47 deletions(-) create mode 100644 include/fc/endpoint.hpp create mode 100644 include/fc/ip.hpp create mode 100644 include/fc/mutex.hpp create mode 100644 src/filesystem.cpp create mode 100644 src/ip.cpp create mode 100644 src/mutex.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a830694..b6c9603 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -62,6 +62,10 @@ set( sources src/hex.cpp src/sha1.cpp src/value_cast.cpp + src/filesystem.cpp + src/ip.cpp + src/bigint.cpp + src/mutex.cpp ) setup_library( fc SOURCES ${sources} ) diff --git a/include/fc/any.hpp b/include/fc/any.hpp index 5461cb6..632fdb5 100644 --- a/include/fc/any.hpp +++ b/include/fc/any.hpp @@ -1,13 +1,9 @@ #ifndef _FC_ANY_HPP_ #define _FC_ANY_HPP_ +#include -namespace fc { namespace reflect { - - // provides value semantics - struct any { - - }; - -} } +namespace fc { + typedef boost::any any; +} #endif // _FC_ANY_HPP_ diff --git a/include/fc/buffer.hpp b/include/fc/buffer.hpp index 3af9a21..e6fce6d 100644 --- a/include/fc/buffer.hpp +++ b/include/fc/buffer.hpp @@ -4,16 +4,16 @@ namespace fc { struct const_buffer { const_buffer( const char* const c = 0, size_t l = 0 ) - :data(c),len(l){} + :data(c),size(l){} const char* const data; - size_t len; + size_t size; }; struct mutable_buffer { mutable_buffer( char* c = 0, size_t l = 0 ) - :data(c),len(l){} + :data(c),size(l){} char* data; - size_t len; + size_t size; }; } diff --git a/include/fc/endpoint.hpp b/include/fc/endpoint.hpp new file mode 100644 index 0000000..939ed07 --- /dev/null +++ b/include/fc/endpoint.hpp @@ -0,0 +1,35 @@ +#ifndef _FC_IP_HPP_ +#define _FC_IP_HPP_ +#include + +namespace fc { + + namespace ip { + class address { + public: + address( uint32_t _ip = 0 ); + address( const fc::string& s ); + + address& operator=( const fc::string& s ); + operator fc::string()const; + + private: + uint32_t _ip; + }; + + class endpoint { + public: + endpoint(); + endpoint( const fc::string& i, uint16_t p ); + endpoint( const address& i, uint16_t p ); + + uint16_t port() { return _port; } + ip::address get_address() { return _ip; } + + private: + uint16_t _port; + address _ip; + }; + } +} +#endif // _FC_ENDPOINT_HPP_ diff --git a/include/fc/filesystem.hpp b/include/fc/filesystem.hpp index 20a7dd4..87d99d7 100644 --- a/include/fc/filesystem.hpp +++ b/include/fc/filesystem.hpp @@ -1,9 +1,40 @@ #ifndef _FC_FILESYSTEM_HPP_ #define _FC_FILESYSTEM_HPP_ -#include +#include +#include + +namespace boost { + namespace filesystem { + class path; + } +} + namespace fc { - typedef boost::filesystem::path path; + class path { + public: + path(); + ~path(); + path( const boost::filesystem::path& ); + path( const fc::string& p ); + path( const path& p ); + path( path&& p ); + path& operator =( const path& ); + path& operator =( path&& ); + + path& operator /=( const fc::path& ); + friend path operator /( const fc::path& p, const fc::path& ); + + operator boost::filesystem::path& (); + operator const boost::filesystem::path& ()const; + + fc::string string()const; + private: + fwd _p; + }; + + bool exists( const path& p ); + void create_directories( const path& p ); } #endif // _FC_FILESYSTEM_HPP_ diff --git a/include/fc/ip.hpp b/include/fc/ip.hpp new file mode 100644 index 0000000..143dfd5 --- /dev/null +++ b/include/fc/ip.hpp @@ -0,0 +1,39 @@ +#ifndef _FC_IP_HPP_ +#define _FC_IP_HPP_ +#include + +namespace fc { + + namespace ip { + class address { + public: + address( uint32_t _ip = 0 ); + address( const fc::string& s ); + + address& operator=( const fc::string& s ); + operator fc::string()const; + + private: + uint32_t _ip; + }; + + class endpoint { + public: + endpoint(); + endpoint( const address& i, uint16_t p = 0); + + /** Converts "IP:PORT" to an endpoint */ + static endpoint from_string( const string& s ); + /** returns "IP:PORT" */ + operator string()const; + + uint16_t port()const; + const address& get_address()const; + + private: + uint16_t _port; + address _ip; + }; + } +} +#endif // _FC_ENDPOINT_HPP_ diff --git a/include/fc/mutex.hpp b/include/fc/mutex.hpp new file mode 100644 index 0000000..69b18ab --- /dev/null +++ b/include/fc/mutex.hpp @@ -0,0 +1,110 @@ +#ifndef FC_MUTEX_HPP_ +#define FC_MUTEX_HPP_ +#include +#include + +namespace fc { + class microseconds; + class time_point; + struct context; + + /** + * @brief mutex + * + * This mutex has an advantage over boost::mutex in that it does + * not involve any system calls, even in contention. + * + * Uncontensted access is a simple compare and swap, no delay. + * + * Contested access by different fibers in the same thread simply + * yields the thread until the lock is available. Actual delay + * is subject to the cooperative nature of other tasks in the + * fiber's thread. + * + * Contested access by different threads requires a spin lock + * while the task is enqueued. Because the enqueue action is + * well-defined and 'short-lived' time spent 'spinning' should + * be minimal. + * + * Cooperatively multi-tasked code must still worry about + * reentrancy. Suppose you have a thread sending a message across a socket, + * the socket members are thread safe, but the write_message() operation is + * not rentrant because the context could yield while waiting for a partial + * write to complete. + * + * If while it has yielded another task in the same thread attempts to write + * a second message then you will get garbage out as both fibers take + * turns writing parts of their messages out of the socket. + * + * Example problem: + * @code + * async(write_message); + * async(write_message); + * void write_message() { + * sock->write(part1); // may yield + * sock->write(part2); // may yield + * sock->write(part3); // may yield + * } + * @endcode + * + * The output could look something like: + * + * @code + * part1 + * part2 + * part1 + * part3 + * part2 + * part3 + * @endcode + * + * What you want to happen is this: + * + * @code + * void write_message() { + * boost::unique_lock lock(sock->write_lock); + * sock->write(part1); // may yield + * sock->write(part2); // may yield + * sock->write(part3); // may yield + * } + * @endcode + * + * Now if while writing the first message, someone attempts to + * write a second message second write will 'queue' behind the + * first by 'blocking' on the mutex. + * + * As a result we now have to extend the normal discussion on thread-safe vs reentrant. + * + * - prempt-thread-safe : the code may be called by multiple os threads at the same time. + * - coop-thread-safe : the code may be called by multiple contexts within the same thread. + * - thread-unsafe : the code may only be called by one context at a time for a set of data. + * + * In the example above, before we added the mutex the code was thread-unsafe + * After we added the mutex the code became coop-thread-safe, and potentially prempt-thread-safe + * + * To be preempt-thread-safe any operations must be atomic or protected by a lock because + * the OS could switch you out between any two instructions. + * + * To be coop-thread-safe all operations are 'atomic' unless they span a 'yield'. If they + * span a yield (such as writing parts of a message), then a mutex is required. + * + */ + class mutex { + public: + mutex(); + ~mutex(); + + bool try_lock(); + bool try_lock_for( const microseconds& rel_time ); + bool try_lock_until( const time_point& abs_time ); + void lock(); + void unlock(); + + private: + fc::spin_yield_lock m_blist_lock; + fc::context* m_blist; + }; + +} // namespace fc + +#endif // MACE_CMT_MUTEX_HPP_ diff --git a/include/fc/sha1.hpp b/include/fc/sha1.hpp index 16648e8..4e3a529 100644 --- a/include/fc/sha1.hpp +++ b/include/fc/sha1.hpp @@ -44,10 +44,12 @@ namespace fc { return ds; } friend sha1 operator << ( const sha1& h1, uint32_t i ); - friend sha1 operator ^ ( const sha1& h1, const sha1 h2 ); - friend bool operator >= ( const sha1& h1, const sha1 h2 ); - friend bool operator > ( const sha1& h1, const sha1 h2 ); - private: + friend bool operator == ( const sha1& h1, const sha1& h2 ); + friend bool operator != ( const sha1& h1, const sha1& h2 ); + friend sha1 operator ^ ( const sha1& h1, const sha1& h2 ); + friend bool operator >= ( const sha1& h1, const sha1& h2 ); + friend bool operator > ( const sha1& h1, const sha1& h2 ); + uint32_t _hash[5]; }; diff --git a/include/fc/thread.hpp b/include/fc/thread.hpp index e81e95a..fdb4b3d 100644 --- a/include/fc/thread.hpp +++ b/include/fc/thread.hpp @@ -95,6 +95,7 @@ namespace fc { * @return true unless quit() has been called. */ bool is_running()const; + bool is_current()const; priority current_priority()const; ~thread(); @@ -103,6 +104,7 @@ namespace fc { thread( class thread_d* ); friend class promise_base; friend class thread_d; + friend class mutex; friend void yield(); friend void usleep(const microseconds&); friend void sleep_until(const time_point&); diff --git a/include/fc/unique_lock.hpp b/include/fc/unique_lock.hpp index 07e5984..c1531fe 100644 --- a/include/fc/unique_lock.hpp +++ b/include/fc/unique_lock.hpp @@ -2,6 +2,8 @@ #define _FC_UNIQUE_LOCK_HPP_ namespace fc { + struct try_to_lock_t{}; + class time_point; /** * Including Boost's unique lock drastically increases compile times @@ -10,11 +12,15 @@ namespace fc { template class unique_lock { public: - unique_lock( T& l ):_lock(l) { _lock.lock(); } - ~unique_lock() { _lock.unlock(); } + unique_lock( T& l, const fc::time_point& abs ):_lock(l) { _locked = _lock.try_lock_until(abs); } + unique_lock( T& l, try_to_lock_t ):_lock(l) { _locked = _lock.try_lock(); } + unique_lock( T& l ):_lock(l) { _lock.lock(); _locked = true; } + ~unique_lock() { _lock.unlock(); _locked = false; } + operator bool()const { return _locked; } private: unique_lock( const unique_lock& ); unique_lock& operator=( const unique_lock& ); + bool _locked; T& _lock; }; diff --git a/src/context.hpp b/src/context.hpp index dc1060c..bff9ae9 100644 --- a/src/context.hpp +++ b/src/context.hpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace fc { class thread; @@ -110,13 +111,13 @@ namespace fc { void timeout_blocking_promises() { for( auto i = blocking_prom.begin(); i != blocking_prom.end(); ++i ) { - i->prom->set_exception( boost::copy_exception( future_wait_timeout() ) ); + i->prom->set_exception( fc::copy_exception( future_wait_timeout() ) ); } } template void except_blocking_promises( const Exception& e ) { for( auto i = blocking_prom.begin(); i != blocking_prom.end(); ++i ) { - i->prom->set_exception( boost::copy_exception( e ) ); + i->prom->set_exception( fc::copy_exception( e ) ); } } void clear_blocking_promises() { diff --git a/src/filesystem.cpp b/src/filesystem.cpp new file mode 100644 index 0000000..9a75506 --- /dev/null +++ b/src/filesystem.cpp @@ -0,0 +1,55 @@ +#include +#include +#include +#include + +namespace fc { + + path::path(){} + path::~path(){}; + path::path( const boost::filesystem::path& p ) + :_p(p){} + + path::path( const fc::string& p ) + :_p(p.c_str()){} + + path::path( const path& p ) + :_p(p){} + + path::path( path&& p ) + :_p(std::move(p)){} + + path& path::operator =( const path& p ) { + *_p = *p._p; + return *this; + } + path& path::operator =( path&& p ) { + *_p = fc::move( *p._p ); + return *this; + } + + path& path::operator /=( const fc::path& p ) { + *_p /= *p._p; + return *this; + } + path operator /( const fc::path& p, const fc::path& o ) { + path tmp; + tmp = *p._p / *o._p; + return tmp; + } + + path::operator boost::filesystem::path& () { + return static_cast(*this); + } + path::operator const boost::filesystem::path& ()const { + return static_cast(*this); + } + + fc::string path::string()const { + return _p->string().c_str(); + } + + + bool exists( const path& p ) { return boost::filesystem::exists(p); } + void create_directories( const path& p ) { boost::filesystem::create_directories(p); } +} diff --git a/src/ip.cpp b/src/ip.cpp new file mode 100644 index 0000000..549ac7a --- /dev/null +++ b/src/ip.cpp @@ -0,0 +1,46 @@ +#include +#include +#include +#include + +namespace fc { namespace ip { + + address::address( uint32_t ip ) + :_ip(ip){} + + address::address( const fc::string& s ) { + _ip = boost::asio::ip::address_v4::from_string(s.c_str()).to_ulong(); + } + + address& address::operator=( const fc::string& s ) { + _ip = boost::asio::ip::address_v4::from_string(s.c_str()).to_ulong(); + return *this; + } + + address::operator fc::string()const { + return boost::asio::ip::address_v4(_ip).to_string().c_str(); + } + + + endpoint::endpoint() + :_port(0){} + endpoint::endpoint(const address& a, uint16_t p) + :_port(p),_ip(a){} + + uint16_t endpoint::port()const { return _port; } + const address& endpoint::get_address()const { return _ip; } + + endpoint endpoint::from_string( const string& s ) { + endpoint ep; + const std::string& st = reinterpret_cast(s); + auto pos = st.find(':'); + ep._ip = boost::asio::ip::address_v4::from_string(st.substr( 0, pos ) ).to_ulong(); + ep._port = boost::lexical_cast( st.substr( pos+1, s.size() ) ); + return ep; + } + + endpoint::operator string()const { + return string(_ip) + ':' + boost::lexical_cast(_port); + } + +} } diff --git a/src/mutex.cpp b/src/mutex.cpp new file mode 100644 index 0000000..80f5d1b --- /dev/null +++ b/src/mutex.cpp @@ -0,0 +1,167 @@ +#include +#include +#include +#include +#include "context.hpp" +#include "thread_d.hpp" + +namespace fc { + + mutex::mutex() + :m_blist(0){} + + mutex::~mutex() { + if( m_blist ) { + auto c = m_blist; + fc::thread::current().debug("~mutex"); + while( c ) { + elog( "still blocking on context %p (%s)", m_blist, (m_blist->cur_task ? m_blist->cur_task->get_desc() : "no current task") ); + c = c->next_blocked; + } + } + BOOST_ASSERT( !m_blist && "Attempt to free mutex while others are blocking on lock." ); + } + + /** + * @param next - is set to the next context to get the lock. + * @return the last context (the one with the lock) + */ + static fc::context* get_tail( fc::context* h, fc::context*& next ) { + next = 0; + fc::context* n = h; + if( !n ) return n; + while( n->next_blocked ) { + next = n; + n=n->next_blocked; + } + return n; + } + static fc::context* remove( fc::context* head, fc::context* target ) { + fc::context* c = head; + fc::context* p = 0; + while( c ) { + if( c == target ) { + if( p ) { + p->next_blocked = c->next_blocked; + return head; + } + return c->next_blocked; + } + p = c; + c = c->next_blocked; + } + return head; + } + static void cleanup( fc::mutex& m, fc::spin_yield_lock& syl, fc::context*& bl, fc::context* cc ) { + { + fc::unique_lock lock(syl); + if( cc->next_blocked ) { + bl = remove(bl, cc ); + return; + } + } + m.unlock(); + } + + /** + * A mutex is considered to hold the lock when + * the current context is the tail in the wait queue. + */ + bool mutex::try_lock() { + fc::thread* ct = &fc::thread::current(); + fc::context* cc = ct->my->current; + fc::context* n = 0; + + fc::unique_lock lock(m_blist_lock, fc::try_to_lock_t()); + if( !lock ) + return false; + + if( !m_blist ) { + m_blist = cc; + return true; + } + // allow recursive locks. + return ( get_tail( m_blist, n ) == cc ); + } + + bool mutex::try_lock_until( const fc::time_point& abs_time ) { + fc::context* n = 0; + fc::context* cc = fc::thread::current().my->current; + + { // lock scope + fc::unique_lock lock(m_blist_lock,abs_time); + if( !lock ) return false; + + if( !m_blist ) { + m_blist = cc; + return true; + } + + // allow recusive locks + if ( get_tail( m_blist, n ) == cc ) + return true; + + cc->next_blocked = m_blist; + m_blist = cc; + } // end lock scope + try { + fc::thread::current().my->yield_until( abs_time, false ); + return( 0 == cc->next_blocked ); + } catch (...) { + cleanup( *this, m_blist_lock, m_blist, cc); + throw; + } + } + + void mutex::lock() { + fc::context* n = 0; + fc::context* cc = fc::thread::current().my->current; + { + boost::unique_lock lock(m_blist_lock); + if( !m_blist ) { + m_blist = cc; + return; + } + + // allow recusive locks + if ( get_tail( m_blist, n ) == cc ) { + return; + } + cc->next_blocked = m_blist; + m_blist = cc; + + int cnt = 0; + auto i = m_blist; + while( i ) { + i = i->next_blocked; + ++cnt; + } + wlog( "wait queue len %1%", cnt ); + } + + try { + fc::thread::current().yield(false); + BOOST_ASSERT( cc->next_blocked == 0 ); + } catch ( ... ) { + wlog( "lock with throw %p %s",this, fc::current_exception().diagnostic_information().c_str() ); + cleanup( *this, m_blist_lock, m_blist, cc); + throw; + } + } + + void mutex::unlock() { + fc::context* next = 0; + { boost::unique_lock lock(m_blist_lock); + get_tail(m_blist, next); + if( next ) { + next->next_blocked = 0; + next->ctx_thread->my->unblock( next ); + } else { + m_blist = 0; + } + } + } + +} // fc + + diff --git a/src/sha1.cpp b/src/sha1.cpp index 5ebf594..91cd406 100644 --- a/src/sha1.cpp +++ b/src/sha1.cpp @@ -16,30 +16,8 @@ namespace fc { } sha1::operator fc::string()const { return str(); } - sha1 operator << ( const sha1& h1, uint32_t i ) { - sha1 result; - uint8_t* r = (uint8_t*)result._hash; - uint8_t* s = (uint8_t*)h1._hash; - for( uint32_t p = 0; p < sizeof(h1._hash)-1; ++p ) - r[p] = s[p] << i | (s[p+1]>>(8-i)); - r[19] = s[19] << i; - return result; - } - sha1 operator ^ ( const sha1& h1, const sha1 h2 ) { - sha1 result; - result._hash[0] = h1._hash[0] ^ h2._hash[0]; - result._hash[1] = h1._hash[1] ^ h2._hash[1]; - result._hash[2] = h1._hash[2] ^ h2._hash[2]; - result._hash[3] = h1._hash[3] ^ h2._hash[3]; - result._hash[4] = h1._hash[4] ^ h2._hash[4]; - return result; - } - bool operator >= ( const sha1& h1, const sha1 h2 ) { - return memcmp( h1._hash, h2._hash, sizeof(h1._hash) ) >= 0; - } - bool operator > ( const sha1& h1, const sha1 h2 ) { - return memcmp( h1._hash, h2._hash, sizeof(h1._hash) ) > 0; - } + char* sha1::data()const { return (char*)&_hash[0]; } + struct sha1::encoder::impl { SHA_CTX ctx; @@ -49,6 +27,15 @@ namespace fc { reset(); } + sha1 sha1::hash( const char* d, uint32_t dlen ) { + encoder e; + e.write(d,dlen); + return e.result(); + } + sha1 sha1::hash( const fc::string& s ) { + return hash( s.c_str(), s.size() ); + } + void sha1::encoder::write( const char* d, uint32_t dlen ) { SHA1_Update( &my->ctx, d, dlen); } @@ -60,5 +47,36 @@ namespace fc { void sha1::encoder::reset() { SHA1_Init( &my->ctx); } + + fc::sha1 operator << ( const fc::sha1& h1, uint32_t i ) { + fc::sha1 result; + uint8_t* r = (uint8_t*)result._hash; + uint8_t* s = (uint8_t*)h1._hash; + for( uint32_t p = 0; p < sizeof(h1._hash)-1; ++p ) + r[p] = s[p] << i | (s[p+1]>>(8-i)); + r[19] = s[19] << i; + return result; + } + fc::sha1 operator ^ ( const fc::sha1& h1, const fc::sha1& h2 ) { + fc::sha1 result; + result._hash[0] = h1._hash[0] ^ h2._hash[0]; + result._hash[1] = h1._hash[1] ^ h2._hash[1]; + result._hash[2] = h1._hash[2] ^ h2._hash[2]; + result._hash[3] = h1._hash[3] ^ h2._hash[3]; + result._hash[4] = h1._hash[4] ^ h2._hash[4]; + return result; + } + bool operator >= ( const fc::sha1& h1, const fc::sha1& h2 ) { + return memcmp( h1._hash, h2._hash, sizeof(h1._hash) ) >= 0; + } + bool operator > ( const fc::sha1& h1, const fc::sha1& h2 ) { + return memcmp( h1._hash, h2._hash, sizeof(h1._hash) ) > 0; + } + bool operator != ( const fc::sha1& h1, const fc::sha1& h2 ) { + return memcmp( h1._hash, h2._hash, sizeof(h1._hash) ) != 0; + } + bool operator == ( const fc::sha1& h1, const fc::sha1& h2 ) { + return memcmp( h1._hash, h2._hash, sizeof(h1._hash) ) == 0; + } -} +} // namespace fc diff --git a/src/thread.cpp b/src/thread.cpp index da13d54..ba7c06b 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -325,6 +325,9 @@ namespace fc { } + bool thread::is_current()const { + return this == ¤t(); + } } diff --git a/src/thread_d.hpp b/src/thread_d.hpp index dad5658..a601e54 100644 --- a/src/thread_d.hpp +++ b/src/thread_d.hpp @@ -359,6 +359,13 @@ namespace fc { return time_point::min(); } + void unblock( fc::context* c ) { + if( fc::thread::current().my != this ) { + async( [=](){ unblock(c); } ); + return; + } + ready_push_front(c); + } void yield_until( const time_point& tp, bool reschedule ) { check_fiber_exceptions();