adding interprocess lock

This commit is contained in:
Daniel Larimer 2016-10-10 17:16:57 -04:00
parent d352463e82
commit 0ace4298c5
4 changed files with 190 additions and 0 deletions

View file

@ -187,6 +187,7 @@ set( fc_sources
src/interprocess/signals.cpp
src/interprocess/file_mapping.cpp
src/interprocess/mmap_struct.cpp
src/interprocess/file_mutex.cpp
src/rpc/cli.cpp
src/rpc/http_api.cpp
src/rpc/json_connection.cpp
@ -374,6 +375,9 @@ add_definitions(-DBOOST_TEST_DYN_LINK)
ENDIF(MSVC)
ENDIF()
add_executable( bip_lock tests/bip_lock.cpp )
target_link_libraries( bip_lock fc )
add_executable( api tests/api.cpp )
target_link_libraries( api fc )

View file

@ -0,0 +1,42 @@
#pragma once
#include <fc/time.hpp>
#include <fc/thread/spin_yield_lock.hpp>
namespace fc {
class microseconds;
class time_point;
class path;
struct context;
namespace detail { class file_mutex_impl; }
/**
* The purpose of this class is to support synchronization of
* processes, threads, and coop-threads.
*
* Before grabbing the lock for a thread or coop, a file_mutex will first
* grab a process-level lock. After grabbing the process level lock, it will
* synchronize in the same way as a local process lock.
*/
class file_mutex {
public:
file_mutex( const fc::path& filename );
~file_mutex();
bool try_lock();
bool try_lock_for( const microseconds& rel_time );
bool try_lock_until( const time_point& abs_time );
void lock();
void unlock();
void lock_shared();
void unlock_shared();
bool try_lock_shared();
int readers()const;
private:
std::unique_ptr<detail::file_mutex_impl> my;
};
} // namespace fc

View file

@ -0,0 +1,100 @@
#include <fc/interprocess/file_mutex.hpp>
//#include <fc/thread/mutex.hpp>
#include <fc/thread/mutex.hpp>
#include <fc/filesystem.hpp>
#include <boost/interprocess/sync/file_lock.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/atomic.hpp>
#include <fc/thread/thread.hpp>
#include <fc/log/logger.hpp>
namespace fc {
namespace bip = boost::interprocess;
void yield();
namespace detail {
class file_mutex_impl {
public:
file_mutex_impl( const char* f )
:_file_mutex( f ),_reader_count(0){}
fc::mutex _write_lock;
bip::file_lock _file_mutex;
boost::atomic<int> _reader_count;
};
}
file_mutex::file_mutex( const fc::path& file )
{
my.reset( new detail::file_mutex_impl( file.generic_string().c_str() ) );
}
file_mutex::~file_mutex() {
}
int file_mutex::readers()const {
return my->_reader_count.load();
}
bool file_mutex::try_lock() {
return false;
if( my->_write_lock.try_lock() ) {
if( my->_file_mutex.try_lock() )
return true;
}
if( my->_file_mutex.try_lock() ) {
if( my->_write_lock.try_lock() ) {
return true;
} else {
my->_file_mutex.unlock();
}
}
return false;
}
bool file_mutex::try_lock_for( const microseconds& rel_time ) {
return false;
}
bool file_mutex::try_lock_until( const time_point& abs_time ) {
return false;
}
void file_mutex::lock() {
my->_write_lock.lock();
while( my->_reader_count.load() > 0 ) {
fc::usleep( fc::microseconds(10) );
}
my->_file_mutex.lock();
}
void file_mutex::unlock() {
my->_file_mutex.unlock();
my->_write_lock.unlock();
}
void file_mutex::lock_shared() {
bip::scoped_lock< fc::mutex > lock( my->_write_lock );
if( 0 == my->_reader_count.fetch_add( 1, boost::memory_order_relaxed ) )
my->_file_mutex.lock_sharable();
}
void file_mutex::unlock_shared() {
if( 1 == my->_reader_count.fetch_add( -1, boost::memory_order_relaxed ) )
my->_file_mutex.unlock_sharable();
}
bool file_mutex::try_lock_shared() {
return false;
if( my->_write_lock.try_lock() ) {
if( my->_reader_count.load() == 0 && my->_file_mutex.try_lock_sharable() ) {
my->_reader_count++;
}
my->_write_lock.unlock();
}
return false;
}
} // namespace fc

44
tests/bip_lock.cpp Normal file
View file

@ -0,0 +1,44 @@
#include <iostream>
#include <fc/interprocess/file_mutex.hpp>
#include <fc/filesystem.hpp>
#include <fc/log/logger.hpp>
#include <fc/thread/thread.hpp>
int main( int argc, char** argv ) {
if( argc < 2 ) return 0;
fc::file_mutex m( argv[1] );
auto mptr = &m;
fc::thread in("in");
std::string cmd;
std::cout << ">>> ";
std::cin >> cmd;
int i = 0;
while( !std::cin.eof() && cmd != "q" ) {
++i;
fc::async( [i, cmd,mptr]() {
ilog( "start ${c} ${i}", ("c",cmd)("i",i) );
if( cmd == "L" ) {
mptr->lock();
} else if( cmd == "l" ) {
mptr->lock_shared();
} else if( cmd == "U" ) {
mptr->unlock();
} else if( cmd == "u" ) {
mptr->unlock_shared();
}
ilog( "end ${c} ${i}", ("c",cmd)("i",i) );
} );
fc::usleep( fc::microseconds( 1000 ) );
cmd = in.async( [&]() {
std::string tmp;
wdump((m.readers()));
std::cin >> tmp;
return tmp;
} );
}
std::cout << "done";
return 0;
}