From 0ace4298c58c091469ebf9aca5e45a5b104089f8 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Mon, 10 Oct 2016 17:16:57 -0400 Subject: [PATCH] adding interprocess lock --- CMakeLists.txt | 4 + include/fc/interprocess/file_mutex.hpp | 42 +++++++++++ src/interprocess/file_mutex.cpp | 100 +++++++++++++++++++++++++ tests/bip_lock.cpp | 44 +++++++++++ 4 files changed, 190 insertions(+) create mode 100644 include/fc/interprocess/file_mutex.hpp create mode 100644 src/interprocess/file_mutex.cpp create mode 100644 tests/bip_lock.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 269486b..0e24f7a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -187,6 +187,7 @@ set( fc_sources src/interprocess/signals.cpp src/interprocess/file_mapping.cpp src/interprocess/mmap_struct.cpp + src/interprocess/file_mutex.cpp src/rpc/cli.cpp src/rpc/http_api.cpp src/rpc/json_connection.cpp @@ -374,6 +375,9 @@ add_definitions(-DBOOST_TEST_DYN_LINK) ENDIF(MSVC) ENDIF() +add_executable( bip_lock tests/bip_lock.cpp ) +target_link_libraries( bip_lock fc ) + add_executable( api tests/api.cpp ) target_link_libraries( api fc ) diff --git a/include/fc/interprocess/file_mutex.hpp b/include/fc/interprocess/file_mutex.hpp new file mode 100644 index 0000000..0379074 --- /dev/null +++ b/include/fc/interprocess/file_mutex.hpp @@ -0,0 +1,42 @@ +#pragma once +#include +#include + +namespace fc { + class microseconds; + class time_point; + class path; + struct context; + + namespace detail { class file_mutex_impl; } + + /** + * The purpose of this class is to support synchronization of + * processes, threads, and coop-threads. + * + * Before grabbing the lock for a thread or coop, a file_mutex will first + * grab a process-level lock. After grabbing the process level lock, it will + * synchronize in the same way as a local process lock. + */ + class file_mutex { + public: + file_mutex( const fc::path& filename ); + ~file_mutex(); + + bool try_lock(); + bool try_lock_for( const microseconds& rel_time ); + bool try_lock_until( const time_point& abs_time ); + void lock(); + void unlock(); + + void lock_shared(); + void unlock_shared(); + bool try_lock_shared(); + + int readers()const; + + private: + std::unique_ptr my; + }; + +} // namespace fc diff --git a/src/interprocess/file_mutex.cpp b/src/interprocess/file_mutex.cpp new file mode 100644 index 0000000..72b142d --- /dev/null +++ b/src/interprocess/file_mutex.cpp @@ -0,0 +1,100 @@ +#include +//#include +#include +#include +#include +#include +#include + +#include +#include + +namespace fc { + namespace bip = boost::interprocess; + + void yield(); + + namespace detail { + class file_mutex_impl { + public: + file_mutex_impl( const char* f ) + :_file_mutex( f ),_reader_count(0){} + + fc::mutex _write_lock; + bip::file_lock _file_mutex; + boost::atomic _reader_count; + }; + } + + file_mutex::file_mutex( const fc::path& file ) + { + my.reset( new detail::file_mutex_impl( file.generic_string().c_str() ) ); + } + + file_mutex::~file_mutex() { + } + + int file_mutex::readers()const { + return my->_reader_count.load(); + } + + bool file_mutex::try_lock() { + return false; + if( my->_write_lock.try_lock() ) { + if( my->_file_mutex.try_lock() ) + return true; + } + if( my->_file_mutex.try_lock() ) { + if( my->_write_lock.try_lock() ) { + return true; + } else { + my->_file_mutex.unlock(); + } + } + return false; + } + + bool file_mutex::try_lock_for( const microseconds& rel_time ) { + return false; + } + + bool file_mutex::try_lock_until( const time_point& abs_time ) { + return false; + } + + void file_mutex::lock() { + my->_write_lock.lock(); + while( my->_reader_count.load() > 0 ) { + fc::usleep( fc::microseconds(10) ); + } + my->_file_mutex.lock(); + } + + void file_mutex::unlock() { + my->_file_mutex.unlock(); + my->_write_lock.unlock(); + } + + void file_mutex::lock_shared() { + bip::scoped_lock< fc::mutex > lock( my->_write_lock ); + if( 0 == my->_reader_count.fetch_add( 1, boost::memory_order_relaxed ) ) + my->_file_mutex.lock_sharable(); + } + + void file_mutex::unlock_shared() { + if( 1 == my->_reader_count.fetch_add( -1, boost::memory_order_relaxed ) ) + my->_file_mutex.unlock_sharable(); + } + + bool file_mutex::try_lock_shared() { + return false; + if( my->_write_lock.try_lock() ) { + if( my->_reader_count.load() == 0 && my->_file_mutex.try_lock_sharable() ) { + my->_reader_count++; + } + my->_write_lock.unlock(); + } + return false; + } + +} // namespace fc diff --git a/tests/bip_lock.cpp b/tests/bip_lock.cpp new file mode 100644 index 0000000..38bf684 --- /dev/null +++ b/tests/bip_lock.cpp @@ -0,0 +1,44 @@ +#include +#include +#include +#include +#include + +int main( int argc, char** argv ) { + if( argc < 2 ) return 0; + fc::file_mutex m( argv[1] ); + auto mptr = &m; + + fc::thread in("in"); + + std::string cmd; + std::cout << ">>> "; + std::cin >> cmd; + int i = 0; + while( !std::cin.eof() && cmd != "q" ) { + ++i; + fc::async( [i, cmd,mptr]() { + ilog( "start ${c} ${i}", ("c",cmd)("i",i) ); + if( cmd == "L" ) { + mptr->lock(); + } else if( cmd == "l" ) { + mptr->lock_shared(); + } else if( cmd == "U" ) { + mptr->unlock(); + } else if( cmd == "u" ) { + mptr->unlock_shared(); + } + ilog( "end ${c} ${i}", ("c",cmd)("i",i) ); + } ); + fc::usleep( fc::microseconds( 1000 ) ); + cmd = in.async( [&]() { + std::string tmp; + wdump((m.readers())); + std::cin >> tmp; + return tmp; + } ); + } + std::cout << "done"; + + return 0; +}