Updates from BitShares FC #22

Closed
nathanielhourt wants to merge 693 commits from dapp-support into latest-fc
16 changed files with 793 additions and 98 deletions
Showing only changes of commit 460e7cccc8 - Show all commits

View file

@ -217,6 +217,7 @@ set( fc_sources
src/thread/spin_lock.cpp
src/thread/spin_yield_lock.cpp
src/thread/mutex.cpp
src/thread/parallel.cpp
src/thread/non_preemptable_scope_check.cpp
src/asio.cpp
src/string.cpp

View file

@ -19,7 +19,6 @@ namespace asio {
* @brief internal implementation types/methods for fc::asio
*/
namespace detail {
using namespace fc;
class read_write_handler
{
@ -59,14 +58,14 @@ namespace asio {
bool operator()( C& c, bool s ) { c.non_blocking(s); return true; }
};
#if WIN32 // windows stream handles do not support non blocking!
#if WIN32 // windows stream handles do not support non blocking!
template<>
struct non_blocking<boost::asio::windows::stream_handle> {
typedef boost::asio::windows::stream_handle C;
bool operator()( C& ) { return false; }
bool operator()( C&, bool ) { return false; }
};
#endif
#endif
} // end of namespace detail
/***
@ -78,7 +77,8 @@ namespace asio {
public:
default_io_service_scope();
~default_io_service_scope();
static void set_num_threads(uint16_t num_threads);
static void set_num_threads(uint16_t num_threads);
static uint16_t get_num_threads();
boost::asio::io_service* io;
private:
std::vector<boost::thread*> asio_threads;

View file

@ -1,12 +0,0 @@
#pragma once
#include <deque>
#include <fc/io/raw.hpp>
namespace fc {
namespace raw {
} // namespace raw
} // namespace fc

View file

@ -2,6 +2,8 @@
#include <fc/fwd.hpp>
#include <fc/string.hpp>
#include <functional>
namespace fc{
class sha1
@ -82,3 +84,6 @@ namespace std
}
};
}
#include <fc/reflect/reflect.hpp>
FC_REFLECT_TYPENAME( fc::sha1 )

View file

@ -21,7 +21,6 @@
#endif
namespace fc {
class abstract_thread;
struct void_t{};
class priority;
class thread;
@ -146,7 +145,9 @@ namespace fc {
public:
typedef fc::shared_ptr< promise<void> > ptr;
promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){}
//promise( const void_t& ){ set_value(); }
promise( bool fulfilled, const char* desc FC_TASK_NAME_DEFAULT_ARG ){
if( fulfilled ) set_value();
}
void wait(const microseconds& timeout = microseconds::maximum() ){
this->_wait( timeout );

View file

@ -0,0 +1,106 @@
/*
* Copyright (c) 2018 The BitShares Blockchain, and contributors.
*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#pragma once
#include <fc/thread/task.hpp>
#include <fc/thread/thread.hpp>
#include <fc/asio.hpp>
#include <boost/atomic/atomic.hpp>
namespace fc {
namespace detail {
class pool_impl;
class worker_pool {
public:
worker_pool();
~worker_pool();
void post( task_base* task );
private:
pool_impl* my;
};
worker_pool& get_worker_pool();
}
class serial_valve {
private:
class ticket_guard {
public:
explicit ticket_guard( boost::atomic<future<void>*>& latch );
~ticket_guard();
void wait_for_my_turn();
private:
promise<void>* my_promise;
future<void>* ticket;
};
friend class ticket_guard;
boost::atomic<future<void>*> latch;
public:
serial_valve();
~serial_valve();
/** Executes f1() then f2().
* For any two calls do_serial(f1,f2) and do_serial(f1',f2') where
* do_serial(f1,f2) is invoked before do_serial(f1',f2'), it is
* guaranteed that f2' will be executed after f2 has completed. Failure
* of either function counts as completion of both.
* If f1 throws then f2 will not be invoked.
*
* @param f1 a functor to invoke
* @param f2 a functor to invoke
* @return the return value of f2()
*/
template<typename Functor1,typename Functor2>
auto do_serial( const Functor1& f1, const Functor2& f2 ) -> decltype(f2())
{
ticket_guard guard( latch );
f1();
guard.wait_for_my_turn();
return f2();
}
};
/**
* Calls function <code>f</code> in a separate thread and returns a future
* that can be used to wait on the result.
*
* @param f the operation to perform
*/
template<typename Functor>
auto do_parallel( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG ) -> fc::future<decltype(f())> {
typedef decltype(f()) Result;
typedef typename fc::deduce<Functor>::type FunctorType;
fc::task<Result,sizeof(FunctorType)>* tsk =
new fc::task<Result,sizeof(FunctorType)>( fc::forward<Functor>(f), desc );
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
detail::get_worker_pool().post( tsk );
return r;
}
}

View file

@ -25,6 +25,7 @@ namespace fc {
};
void* get_task_specific_data(unsigned slot);
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
class idle_guard;
}
class task_base : virtual public promise_base {
@ -53,6 +54,7 @@ namespace fc {
// thread/thread_private
friend class thread;
friend class thread_d;
friend class detail::idle_guard;
fwd<spin_lock,8> _spinlock;
// avoid rtti info for every possible functor...

View file

@ -12,6 +12,7 @@ namespace fc {
namespace detail
{
class worker_pool;
void* get_thread_specific_data(unsigned slot);
void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
unsigned get_next_unused_task_storage_slot();
@ -19,11 +20,29 @@ namespace fc {
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
}
/** Instances of this class can be used to get notifications when a thread is
* (or is no longer) idle.
*/
class thread_idle_notifier {
public:
virtual ~thread_idle_notifier() {}
/** This method is called when the thread is idle. If it returns a
* task_base it will be queued and executed immediately.
* @return a task to execute, or nullptr
*/
virtual task_base* idle() = 0;
/** This method is called when the thread is no longer idle, e. g. after
* it has woken up due to a timer or signal.
*/
virtual void busy() = 0;
};
class thread {
public:
thread( const std::string& name = "" );
thread( thread&& m );
thread& operator=(thread&& t );
thread( const std::string& name = "", thread_idle_notifier* notifier = 0 );
thread( thread&& m ) = delete;
thread& operator=(thread&& t ) = delete;
/**
* Returns the current thread.
@ -130,11 +149,12 @@ namespace fc {
return wait_any_until(fc::move(proms), fc::time_point::now()+timeout_us );
}
private:
thread( class thread_d* );
thread( class thread_d* ); // parameter is ignored, will create a new thread_d
friend class promise_base;
friend class task_base;
friend class thread_d;
friend class mutex;
friend class detail::worker_pool;
friend void* detail::get_thread_specific_data(unsigned slot);
friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
friend unsigned detail::get_next_unused_task_storage_slot();

View file

@ -58,7 +58,6 @@ namespace fc {
}
else
{
//elog( "${message} ", ("message", boost::system::system_error(ec).what()));
p->set_exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
@ -83,8 +82,6 @@ namespace fc {
}
p->set_value( eps );
} else {
//elog( "%s", boost::system::system_error(ec).what() );
//p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) );
p->set_exception(
fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "process exited with: ${message} ",
@ -104,10 +101,12 @@ namespace fc {
* @param num_threads the number of threads
*/
void default_io_service_scope::set_num_threads(uint16_t num_threads) {
FC_ASSERT(fc::asio::default_io_service_scope::num_io_threads == 0);
fc::asio::default_io_service_scope::num_io_threads = num_threads;
FC_ASSERT(num_io_threads == 0);
num_io_threads = num_threads;
}
uint16_t default_io_service_scope::get_num_threads() { return num_io_threads; }
/***
* Default constructor
*/
@ -116,18 +115,18 @@ namespace fc {
io = new boost::asio::io_service();
the_work = new boost::asio::io_service::work(*io);
if (this->num_io_threads == 0)
if( num_io_threads == 0 )
{
// the default was not set by the configuration. Determine a good
// number of threads. Minimum of 8, maximum of hardware_concurrency
this->num_io_threads = std::max( boost::thread::hardware_concurrency(), 8u );
num_io_threads = std::max( boost::thread::hardware_concurrency(), 8U );
}
for( uint16_t i = 0; i < this->num_io_threads; ++i )
for( uint16_t i = 0; i < num_io_threads; ++i )
{
asio_threads.push_back( new boost::thread( [=]()
asio_threads.push_back( new boost::thread( [i,this]()
{
fc::thread::current().set_name("asio");
fc::thread::current().set_name( "fc::asio worker #" + fc::to_string(i) );
BOOST_SCOPE_EXIT(void)
{
@ -194,7 +193,7 @@ namespace fc {
promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") );
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait();;
return p->wait();
}
FC_RETHROW_EXCEPTIONS(warn, "")
}

202
src/thread/parallel.cpp Normal file
View file

@ -0,0 +1,202 @@
/*
* Copyright (c) 2018 The BitShares Blockchain, and contributors.
*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <fc/thread/parallel.hpp>
#include <fc/thread/spin_yield_lock.hpp>
#include <fc/thread/unique_lock.hpp>
#include <fc/asio.hpp>
#include <boost/atomic/atomic.hpp>
#include <boost/lockfree/queue.hpp>
namespace fc {
namespace detail {
class idle_notifier_impl : public thread_idle_notifier
{
public:
idle_notifier_impl()
{
is_idle.store(false);
}
idle_notifier_impl( const idle_notifier_impl& copy )
{
id = copy.id;
my_pool = copy.my_pool;
is_idle.store( copy.is_idle.load() );
}
virtual ~idle_notifier_impl() {}
virtual task_base* idle();
virtual void busy()
{
is_idle.store(false);
}
uint32_t id;
pool_impl* my_pool;
boost::atomic<bool> is_idle;
};
class pool_impl
{
public:
explicit pool_impl( const uint16_t num_threads )
: idle_threads( 2 * num_threads ), waiting_tasks( 200 )
{
notifiers.resize( num_threads );
threads.reserve( num_threads );
for( uint32_t i = 0; i < num_threads; i++ )
{
notifiers[i].id = i;
notifiers[i].my_pool = this;
threads.push_back( new thread( "pool worker " + fc::to_string(i), &notifiers[i] ) );
}
}
~pool_impl()
{
for( thread* t : threads)
delete t; // also calls quit()
waiting_tasks.consume_all( [] ( task_base* t ) {
t->cancel( "thread pool quitting" );
});
}
thread* post( task_base* task )
{
idle_notifier_impl* ini;
while( idle_threads.pop( ini ) )
if( ini->is_idle.exchange( false ) )
{ // minor race condition here, a thread might receive a task while it's busy
return threads[ini->id];
}
boost::unique_lock<fc::spin_yield_lock> lock(pool_lock);
while( idle_threads.pop( ini ) )
if( ini->is_idle.exchange( false ) )
{ // minor race condition here, a thread might receive a task while it's busy
return threads[ini->id];
}
while( !waiting_tasks.push( task ) )
elog( "Worker pool internal error" );
return 0;
}
task_base* enqueue_idle_thread( idle_notifier_impl* ini )
{
task_base* task;
if( waiting_tasks.pop( task ) )
return task;
fc::unique_lock<fc::spin_yield_lock> lock(pool_lock);
if( waiting_tasks.pop( task ) )
return task;
while( !idle_threads.push( ini ) )
elog( "Worker pool internal error" );
return 0;
}
private:
std::vector<idle_notifier_impl> notifiers;
std::vector<thread*> threads;
boost::lockfree::queue<idle_notifier_impl*> idle_threads;
boost::lockfree::queue<task_base*> waiting_tasks;
fc::spin_yield_lock pool_lock;
};
task_base* idle_notifier_impl::idle()
{
is_idle.store( true );
task_base* result = my_pool->enqueue_idle_thread( this );
if( result ) is_idle.store( false );
return result;
}
worker_pool::worker_pool()
{
fc::asio::default_io_service();
my = new pool_impl( fc::asio::default_io_service_scope::get_num_threads() );
}
worker_pool::~worker_pool()
{
delete my;
}
void worker_pool::post( task_base* task )
{
thread* worker = my->post( task );
if( worker )
worker->async_task( task, priority() );
}
worker_pool& get_worker_pool()
{
static worker_pool the_pool;
return the_pool;
}
}
serial_valve::ticket_guard::ticket_guard( boost::atomic<future<void>*>& latch )
{
my_promise = new promise<void>();
future<void>* my_future = new future<void>( promise<void>::ptr( my_promise, true ) );
try
{
do
{
ticket = latch.load();
FC_ASSERT( ticket, "Valve is shutting down!" );
}
while( !latch.compare_exchange_weak( ticket, my_future ) );
}
catch (...)
{
delete my_future; // this takes care of my_promise as well
throw;
}
}
serial_valve::ticket_guard::~ticket_guard()
{
my_promise->set_value();
ticket->wait();
delete ticket;
}
void serial_valve::ticket_guard::wait_for_my_turn()
{
ticket->wait();
}
serial_valve::serial_valve()
{
latch.store( new future<void>( promise<void>::ptr( new promise<void>( true ), true ) ) );
}
serial_valve::~serial_valve()
{
fc::future<void>* last = latch.exchange( 0 );
last->wait();
delete last;
}
} // namespace fc

View file

@ -4,6 +4,8 @@
#include <fc/log/logger.hpp>
#include "thread_d.hpp"
#include <iostream>
#if defined(_MSC_VER) && !defined(NDEBUG)
# include <windows.h>
const DWORD MS_VC_EXCEPTION=0x406D1388;
@ -34,7 +36,7 @@ static void set_thread_name(const char* threadName)
{
}
}
#elif defined(__linux__) && !defined(NDEBUG)
#elif defined(__linux__)
# include <pthread.h>
static void set_thread_name(const char* threadName)
{
@ -62,59 +64,58 @@ namespace fc {
}
thread*& current_thread() {
#ifdef _MSC_VER
#ifdef _MSC_VER
static __declspec(thread) thread* t = NULL;
#else
#else
static __thread thread* t = NULL;
#endif
#endif
return t;
}
thread::thread( const std::string& name ) {
thread::thread( const std::string& name, thread_idle_notifier* notifier ) {
promise<void>::ptr p(new promise<void>("thread start"));
boost::thread* t = new boost::thread( [this,p,name]() {
boost::thread* t = new boost::thread( [this,p,name,notifier]() {
try {
set_thread_name(name.c_str()); // set thread's name for the debugger to display
this->my = new thread_d(*this);
this->my = new thread_d( *this, notifier );
current_thread() = this;
p->set_value();
exec();
} catch ( fc::exception& e ) {
wlog( "unhandled exception" );
p->set_exception( e.dynamic_copy_exception() );
if( !p->ready() )
{
wlog( "unhandled exception" );
p->set_exception( e.dynamic_copy_exception() );
}
else
{ // possibly shutdown?
std::cerr << "unhandled exception in thread '" << name << "'\n";
std::cerr << e.to_detail_string( log_level::warn );
}
} catch ( ... ) {
wlog( "unhandled exception" );
p->set_exception( std::make_shared<unhandled_exception>( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) );
//assert( !"unhandled exception" );
//elog( "Caught unhandled exception %s", boost::current_exception_diagnostic_information().c_str() );
if( !p->ready() )
{
wlog( "unhandled exception" );
p->set_exception( std::make_shared<unhandled_exception>( FC_LOG_MESSAGE( warn, "unhandled exception: ${diagnostic}", ("diagnostic",boost::current_exception_diagnostic_information()) ) ) );
}
else
{ // possibly shutdown?
std::cerr << "unhandled exception in thread '" << name << "'\n";
std::cerr << boost::current_exception_diagnostic_information() << "\n";
}
}
} );
p->wait();
my->boost_thread = t;
my->name = name;
//wlog("name:${n} tid:${tid}", ("n", name)("tid", (uintptr_t)my->boost_thread->native_handle()) );
}
thread::thread( thread_d* ) {
my = new thread_d(*this);
}
thread::thread( thread&& m ) {
my = m.my;
m.my = 0;
}
thread& thread::operator=(thread&& t ) {
fc_swap(t.my,my);
return *this;
}
thread::~thread() {
//wlog( "my ${n}", ("n",name()) );
if( my )
{
// wlog( "calling quit() on ${n}",("n",my->name) );
quit();
}
delete my;
}
@ -139,7 +140,7 @@ namespace fc {
{
if (!is_current())
{
async([=](){ set_name(n); }, "set_name").wait();
async([this,n](){ set_name(n); }, "set_name").wait();
return;
}
my->name = n;
@ -162,17 +163,13 @@ namespace fc {
if( !is_current() )
{
auto t = my->boost_thread;
async( [=](){quit();}, "thread::quit" );//.wait();
async( [this](){quit();}, "thread::quit" );
if( t )
{
//wlog("destroying boost thread ${tid}",("tid",(uintptr_t)my->boost_thread->native_handle()));
t->join();
}
return;
}
my->done = true;
// wlog( "${s}", ("s",name()) );
// We are quiting from our own thread...
// break all promises, thread quit!
@ -183,20 +180,14 @@ namespace fc {
{
fc::context* n = cur->next;
// this will move the context into the ready list.
//cur->prom->set_exception( boost::copy_exception( error::thread_quit() ) );
//cur->set_exception_on_blocking_promises( thread_quit() );
cur->set_exception_on_blocking_promises( std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")) );
cur = n;
}
if( my->blocked )
{
//wlog( "still blocking... whats up with that?");
debug( "on quit" );
}
}
BOOST_ASSERT( my->blocked == 0 );
//my->blocked = 0;
for (task_base* unstarted_task : my->task_pqueue)
unstarted_task->set_exception(std::make_shared<canceled_exception>(FC_LOG_MESSAGE(error, "cancellation reason: thread quitting")));
@ -324,7 +315,6 @@ namespace fc {
if( p[i]->ready() )
return i;
//BOOST_THROW_EXCEPTION( wait_any_error() );
return -1;
}
@ -340,8 +330,6 @@ namespace fc {
void thread::async_task( task_base* t, const priority& p, const time_point& tp ) {
assert(my);
t->_when = tp;
// slog( "when %lld", t->_when.time_since_epoch().count() );
// slog( "delay %lld", (tp - fc::time_point::now()).count() );
task_base* stale_head = my->task_in_queue.load(boost::memory_order_relaxed);
do { t->_next = stale_head;
}while( !my->task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );
@ -391,7 +379,6 @@ namespace fc {
if( !my->current )
my->current = new fc::context(&fc::thread::current());
//slog( " %1% blocking on %2%", my->current, p.get() );
my->current->add_blocking_promise(p.get(), true);
// if not max timeout, added to sleep pqueue
@ -404,15 +391,10 @@ namespace fc {
sleep_priority_less() );
}
// elog( "blocking %1%", my->current );
my->add_to_blocked( my->current );
// my->debug("swtiching fibers..." );
my->start_next_fiber();
// slog( "resuming %1%", my->current );
//slog( " %1% unblocking blocking on %2%", my->current, p.get() );
my->current->remove_blocking_promise(p.get());
my->check_fiber_exceptions();
@ -420,7 +402,6 @@ namespace fc {
void thread::notify( const promise_base::ptr& p )
{
//slog( "this %p my %p", this, my );
BOOST_ASSERT(p->ready());
if( !is_current() )
{
@ -483,7 +464,7 @@ namespace fc {
void thread::notify_task_has_been_canceled()
{
async( [=](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() );
async( [this](){ my->notify_task_has_been_canceled(); }, "notify_task_has_been_canceled", priority::max() );
}
void thread::unblock(fc::context* c)
@ -491,6 +472,26 @@ namespace fc {
my->unblock(c);
}
namespace detail {
idle_guard::idle_guard( thread_d* t ) : notifier(t->notifier)
{
if( notifier )
{
task_base* work = notifier->idle();
if( work )
{
task_base* stale_head = t->task_in_queue.load(boost::memory_order_relaxed);
do {
work->_next = stale_head;
} while( !t->task_in_queue.compare_exchange_weak( stale_head, work, boost::memory_order_release ) );
}
}
}
idle_guard::~idle_guard()
{
if( notifier ) notifier->busy();
}
}
#ifdef _MSC_VER
/* support for providing a structured exception handler for async tasks */

View file

@ -15,12 +15,23 @@ namespace fc {
return a->resume_time > b->resume_time;
}
};
namespace detail {
class idle_guard {
public:
explicit idle_guard( thread_d* t );
~idle_guard();
private:
thread_idle_notifier* notifier;
};
}
class thread_d {
public:
using context_pair = std::pair<thread_d*, fc::context*>;
thread_d(fc::thread& s)
thread_d( fc::thread& s, thread_idle_notifier* n = 0 )
:self(s), boost_thread(0),
task_in_queue(0),
next_posted_num(1),
@ -28,7 +39,8 @@ namespace fc {
current(0),
pt_head(0),
blocked(0),
next_unused_task_storage_slot(0)
next_unused_task_storage_slot(0),
notifier(n)
#ifndef NDEBUG
,non_preemptable_scope_count(0)
#endif
@ -98,6 +110,8 @@ namespace fc {
std::vector<detail::specific_data_info> non_task_specific_data;
unsigned next_unused_task_storage_slot;
thread_idle_notifier *notifier;
#ifndef NDEBUG
unsigned non_preemptable_scope_count;
#endif
@ -585,6 +599,11 @@ namespace fc {
if( done )
return;
detail::idle_guard guard( this );
if( task_in_queue.load(boost::memory_order_relaxed) )
continue;
if( timeout_time == time_point::maximum() )
task_ready.wait( lock );
else if( timeout_time != time_point::min() )
@ -666,7 +685,7 @@ namespace fc {
{
if( fc::thread::current().my != this )
{
self.async( [=](){ unblock(c); }, "thread_d::unblock" );
self.async( [this,c](){ unblock(c); }, "thread_d::unblock" );
return;
}

View file

@ -53,6 +53,7 @@ add_executable( all_tests all_tests.cpp
network/http/websocket_test.cpp
thread/task_cancel.cpp
thread/thread_tests.cpp
thread/parallel_tests.cpp
bloom_test.cpp
real128_test.cpp
serialization_test.cpp

View file

@ -18,10 +18,6 @@ BOOST_AUTO_TEST_CASE(tcpconstructor_test)
class my_io_class : public fc::asio::default_io_service_scope
{
public:
uint16_t get_num_threads()
{
return fc::asio::default_io_service_scope::num_io_threads;
}
static void reset_num_threads() { fc::asio::default_io_service_scope::num_io_threads = 0; }
};

View file

@ -5,14 +5,26 @@ if [ "$#" != 1 ]; then
exit 1
fi
"$1" --list_content 2>&1 \
| grep '\*$' \
| sed 's=\*$==;s=^ =/=' \
| while read t; do
CACHE="$( find . -name CMakeCache.txt )"
if [ "$CACHE" != "" ]; then
BOOST_INC="$( grep Boost_INCLUDE_DIR:PATH "$CACHE" | cut -d= -f 2 )"
if [ "$BOOST_INC" != "" ]; then
BOOST_VERSION="$( grep '^#define *BOOST_VERSION ' "$BOOST_INC/boost/version.hpp" | sed 's=^.* ==' )"
fi
fi
if [ "$BOOST_VERSION" = "" -o "$BOOST_VERSION" -lt 105900 ]; then
echo "Boost version '$BOOST_VERSION' - executing tests serially"
"$1"
else
"$1" --list_content 2>&1 \
| grep '\*$' \
| sed 's=\*$==;s=^ =/=' \
| while read t; do
case "$t" in
/*) echo "$pre$t"; ;;
*) pre="$t"; ;;
esac
done \
| parallel echo Running {}\; "$1" -t {}
done \
| parallel echo Running {}\; "$1" -t {}
fi

View file

@ -0,0 +1,342 @@
/*
* Copyright (c) 2018 The BitShares Blockchain, and contributors.
*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <boost/test/unit_test.hpp>
#include <fc/crypto/elliptic.hpp>
#include <fc/crypto/ripemd160.hpp>
#include <fc/crypto/sha1.hpp>
#include <fc/crypto/sha224.hpp>
#include <fc/crypto/sha256.hpp>
#include <fc/crypto/sha512.hpp>
#include <fc/thread/parallel.hpp>
#include <fc/time.hpp>
#include <iostream>
struct thread_config {
thread_config() {
for( int i = 0; i < boost::unit_test::framework::master_test_suite().argc - 1; ++i )
if( !strcmp( boost::unit_test::framework::master_test_suite().argv[i], "--pool-threads" ) )
{
uint16_t threads = atoi(boost::unit_test::framework::master_test_suite().argv[++i]);
std::cout << "Using " << threads << " pool threads\n";
fc::asio::default_io_service_scope::set_num_threads(threads);
}
}
};
BOOST_GLOBAL_FIXTURE( thread_config );
BOOST_AUTO_TEST_SUITE(parallel_tests)
BOOST_AUTO_TEST_CASE( do_nothing_parallel )
{
std::vector<fc::future<void>> results;
results.reserve( 20 );
for( size_t i = 0; i < results.capacity(); i++ )
results.push_back( fc::do_parallel( [i] () { std::cout << i << ","; } ) );
for( auto& result : results )
result.wait();
std::cout << "\n";
}
BOOST_AUTO_TEST_CASE( do_something_parallel )
{
struct result {
boost::thread::id thread_id;
int call_count;
};
std::vector<fc::future<result>> results;
results.reserve( 20 );
boost::thread_specific_ptr<int> tls;
for( size_t i = 0; i < results.capacity(); i++ )
results.push_back( fc::do_parallel( [i,&tls] () {
if( !tls.get() ) { tls.reset( new int(0) ); }
result res = { boost::this_thread::get_id(), (*tls.get())++ };
return res;
} ) );
std::map<boost::thread::id,std::vector<int>> results_by_thread;
for( auto& res : results )
{
result r = res.wait();
results_by_thread[r.thread_id].push_back( r.call_count );
}
BOOST_CHECK( results_by_thread.size() > 1 ); // require execution by more than 1 thread
for( auto& pair : results_by_thread )
{ // check that thread_local_storage counter works
std::sort( pair.second.begin(), pair.second.end() );
for( size_t i = 0; i < pair.second.size(); i++ )
BOOST_CHECK_EQUAL( i, pair.second[i] );
}
}
const std::string TEXT = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"$%&/()=?,.-#+´{[]}`*'_:;<>|";
template<typename Hash>
class hash_test {
public:
std::string _hashname = fc::get_typename<Hash>::name();
void run_single_threaded() {
const std::string first = Hash::hash(TEXT).str();
fc::time_point start = fc::time_point::now();
for( int i = 0; i < 1000; i++ )
BOOST_CHECK_EQUAL( first, Hash::hash(TEXT).str() );
fc::time_point end = fc::time_point::now();
ilog( "${c} single-threaded ${h}'s in ${t}µs", ("c",1000)("h",_hashname)("t",end-start) );
}
void run_multi_threaded() {
const std::string first = Hash::hash(TEXT).str();
std::vector<fc::future<std::string>> results;
results.reserve( 10000 );
fc::time_point start = fc::time_point::now();
for( int i = 0; i < 10000; i++ )
results.push_back( fc::do_parallel( [] () { return Hash::hash(TEXT).str(); } ) );
for( auto& result: results )
BOOST_CHECK_EQUAL( first, result.wait() );
fc::time_point end = fc::time_point::now();
ilog( "${c} multi-threaded ${h}'s in ${t}µs", ("c",10000)("h",_hashname)("t",end-start) );
}
void run() {
run_single_threaded();
run_multi_threaded();
}
};
BOOST_AUTO_TEST_CASE( hash_parallel )
{
hash_test<fc::ripemd160>().run();
hash_test<fc::sha1>().run();
hash_test<fc::sha224>().run();
hash_test<fc::sha256>().run();
hash_test<fc::sha512>().run();
}
BOOST_AUTO_TEST_CASE( sign_verify_parallel )
{
const fc::sha256 HASH = fc::sha256::hash(TEXT);
std::vector<fc::ecc::private_key> keys;
keys.reserve(1000);
for( int i = 0; i < 1000; i++ )
keys.push_back( fc::ecc::private_key::regenerate( fc::sha256::hash( TEXT + fc::to_string(i) ) ) );
std::vector<fc::ecc::compact_signature> sigs;
sigs.reserve( 10 * keys.size() );
{
fc::time_point start = fc::time_point::now();
for( int i = 0; i < 10; i++ )
for( const auto& key: keys )
sigs.push_back( key.sign_compact( HASH ) );
fc::time_point end = fc::time_point::now();
ilog( "${c} single-threaded signatures in ${t}µs", ("c",sigs.size())("t",end-start) );
}
{
fc::time_point start = fc::time_point::now();
for( size_t i = 0; i < sigs.size(); i++ )
BOOST_CHECK( keys[i % keys.size()].get_public_key() == fc::ecc::public_key( sigs[i], HASH ) );
fc::time_point end = fc::time_point::now();
ilog( "${c} single-threaded verifies in ${t}µs", ("c",sigs.size())("t",end-start) );
}
{
std::vector<fc::future<fc::ecc::compact_signature>> results;
results.reserve( 10 * keys.size() );
fc::time_point start = fc::time_point::now();
for( int i = 0; i < 10; i++ )
for( const auto& key: keys )
results.push_back( fc::do_parallel( [&key,&HASH] () { return key.sign_compact( HASH ); } ) );
for( auto& res : results )
res.wait();
fc::time_point end = fc::time_point::now();
ilog( "${c} multi-threaded signatures in ${t}µs", ("c",sigs.size())("t",end-start) );
}
{
std::vector<fc::future<fc::ecc::public_key>> results;
results.reserve( sigs.size() );
fc::time_point start = fc::time_point::now();
for( const auto& sig: sigs )
results.push_back( fc::do_parallel( [&sig,&HASH] () { return fc::ecc::public_key( sig, HASH ); } ) );
for( size_t i = 0; i < results.size(); i++ )
BOOST_CHECK( keys[i % keys.size()].get_public_key() == results[i].wait() );
fc::time_point end = fc::time_point::now();
ilog( "${c} multi-threaded verifies in ${t}µs", ("c",sigs.size())("t",end-start) );
}
}
BOOST_AUTO_TEST_CASE( serial_valve )
{
boost::atomic<uint32_t> counter(0);
fc::serial_valve valve;
{ // Simple test, f2 finishes before f1
fc::promise<void>* syncer = new fc::promise<void>();
fc::promise<void>* waiter = new fc::promise<void>();
auto p1 = fc::async([&counter,&valve,syncer,waiter] () {
valve.do_serial( [syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
// at this point, p1.f1 has started executing and is waiting on waiter
syncer = new fc::promise<void>();
auto p2 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
fc::usleep( fc::milliseconds(10) );
// at this point, p2.f1 has started executing and p2.f2 is waiting for its turn
BOOST_CHECK( !p1.ready() );
BOOST_CHECK( !p2.ready() );
waiter->set_value(); // signal p1.f1 to continue
p2.wait(); // and wait for p2.f2 to complete
BOOST_CHECK( p1.ready() );
BOOST_CHECK( p2.ready() );
BOOST_CHECK_EQUAL( 2, counter.load() );
}
{ // Triple test, f3 finishes first, then f1, finally f2
fc::promise<void>* syncer = new fc::promise<void>();
fc::promise<void>* waiter = new fc::promise<void>();
counter.store(0);
auto p1 = fc::async([&counter,&valve,syncer,waiter] () {
valve.do_serial( [&syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
// at this point, p1.f1 has started executing and is waiting on waiter
syncer = new fc::promise<void>();
auto p2 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [&syncer](){ syncer->set_value();
fc::usleep( fc::milliseconds(100) ); },
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
// at this point, p2.f1 has started executing and is sleeping
syncer = new fc::promise<void>();
auto p3 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
fc::usleep( fc::milliseconds(10) );
// at this point, p3.f1 has started executing and p3.f2 is waiting for its turn
BOOST_CHECK( !p1.ready() );
BOOST_CHECK( !p2.ready() );
BOOST_CHECK( !p3.ready() );
waiter->set_value(); // signal p1.f1 to continue
p3.wait(); // and wait for p3.f2 to complete
BOOST_CHECK( p1.ready() );
BOOST_CHECK( p2.ready() );
BOOST_CHECK( p3.ready() );
BOOST_CHECK_EQUAL( 3, counter.load() );
}
{ // Triple test again but with invocations from different threads
fc::promise<void>* syncer = new fc::promise<void>();
fc::promise<void>* waiter = new fc::promise<void>();
counter.store(0);
auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () {
valve.do_serial( [&syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
// at this point, p1.f1 has started executing and is waiting on waiter
syncer = new fc::promise<void>();
auto p2 = fc::do_parallel([&counter,&valve,syncer] () {
valve.do_serial( [&syncer](){ syncer->set_value();
fc::usleep( fc::milliseconds(100) ); },
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
// at this point, p2.f1 has started executing and is sleeping
syncer = new fc::promise<void>();
auto p3 = fc::do_parallel([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();
fc::usleep( fc::milliseconds(10) );
// at this point, p3.f1 has started executing and p3.f2 is waiting for its turn
BOOST_CHECK( !p1.ready() );
BOOST_CHECK( !p2.ready() );
BOOST_CHECK( !p3.ready() );
waiter->set_value(); // signal p1.f1 to continue
p3.wait(); // and wait for p3.f2 to complete
BOOST_CHECK( p1.ready() );
BOOST_CHECK( p2.ready() );
BOOST_CHECK( p3.ready() );
BOOST_CHECK_EQUAL( 3, counter.load() );
}
}
BOOST_AUTO_TEST_SUITE_END()