RPC now supports remote callbacks

to_variant now skips null optional members on reflected objects.
This commit is contained in:
Daniel Larimer 2015-03-31 11:31:56 -04:00
parent bcd642e31f
commit 1fb31737a7
6 changed files with 280 additions and 101 deletions

View file

@ -61,6 +61,7 @@ namespace fc {
friend bool operator == ( const api& a, const api& b ) { return a._data == b._data && a._vtable == b._vtable; }
friend bool operator != ( const api& a, const api& b ) { return !(a._data == b._data && a._vtable == b._vtable); }
uint64_t get_handle()const { return uint64_t(_data.get()); }
vtable_type& operator*()const { FC_ASSERT( _vtable ); return *_vtable; }
vtable_type* operator->()const { FC_ASSERT( _vtable ); return _vtable.get(); }

View file

@ -20,9 +20,20 @@ namespace fc
template<typename Member, class Class, Member (Class::*member)>
void operator()( const char* name )const
{
vo(name,(val.*member));
this->add(vo,name,(val.*member));
}
private:
template<typename M>
void add( mutable_variant_object& vo, const char* name, const optional<M>& v )const
{
if( v.valid() )
vo(name,*v);
}
template<typename M>
void add( mutable_variant_object& vo, const char* name, const M& v )const
{ vo(name,v); }
mutable_variant_object& vo;
const T& val;
};

View file

@ -15,6 +15,58 @@ namespace fc {
typedef uint32_t api_id_type;
namespace detail {
template<typename Signature>
class callback_functor
{
public:
typedef typename std::function<Signature>::result_type result_type;
callback_functor( fc::api_connection& con, uint64_t id )
:_callback_id(id),_api_connection(con){}
template<typename... Args>
result_type operator()( Args... args )const;
private:
uint64_t _callback_id;
fc::api_connection& _api_connection;
};
template<typename R, typename Arg0, typename ... Args>
std::function<R(Args...)> bind_first_arg( const std::function<R(Arg0,Args...)>& f, Arg0 a0 )
{
return [=]( Args... args ) { return f( a0, args... ); };
}
template<typename R>
R call_generic( const std::function<R()>& f, variants::const_iterator a0, variants::const_iterator e )
{
return f();
}
template<typename R, typename Arg0, typename ... Args>
R call_generic( const std::function<R(Arg0,Args...)>& f, variants::const_iterator a0, variants::const_iterator e )
{
FC_ASSERT( a0 != e );
return call_generic<R,Args...>( bind_first_arg<R,Arg0,Args...>( f, a0->as< typename std::decay<Arg0>::type >() ), a0+1, e );
}
template<typename R, typename ... Args>
std::function<variant(const fc::variants&)> to_generic( const std::function<R(Args...)>& f )
{
return [=]( const variants& args ) {
return variant( call_generic( f, args.begin(), args.end() ) );
};
}
template<typename ... Args>
std::function<variant(const fc::variants&)> to_generic( const std::function<void(Args...)>& f )
{
return [=]( const variants& args ) {
call_generic( f, args.begin(), args.end() );
return variant();
};
}
class generic_api
{
public:
@ -36,31 +88,50 @@ namespace fc {
return _methods[method_id](args);
}
fc::api_connection& get_connection(){ return *_api_connection; }
private:
friend struct api_visitor;
template<typename R, typename Arg0, typename ... Args>
std::function<R(Args...)> bind_first_arg( const std::function<R(Arg0,Args...)>& f, Arg0 a0 )const
{
return [=]( Args... args ) { return f( a0, args... ); };
}
template<typename R>
R call_generic( const std::function<R()>& f, variants::const_iterator a0, variants::const_iterator e )const
{
return f();
}
template<typename R, typename Signature, typename ... Args>
R call_generic( const std::function<R(std::function<Signature>,Args...)>& f, variants::const_iterator a0, variants::const_iterator e )
{
FC_ASSERT( a0 != e );
callback_functor<Signature> arg0( *this, a0->as<uint64_t>() );
return call_generic<R,Args...>( this->bind_first_arg<R,std::function<Signature>,Args...>( f, std::function<Signature>(arg0) ), a0+1, e );
}
template<typename R, typename Signature, typename ... Args>
R call_generic( const std::function<R(const std::function<Signature>&,Args...)>& f, variants::const_iterator a0, variants::const_iterator e )
{
FC_ASSERT( a0 != e );
callback_functor<Signature> arg0( get_connection(), a0->as<uint64_t>() );
return call_generic<R,Args...>( this->bind_first_arg<R,const std::function<Signature>&,Args...>( f, arg0 ), a0+1, e );
}
template<typename R, typename Arg0, typename ... Args>
R call_generic( const std::function<R(Arg0,Args...)>& f, variants::const_iterator a0, variants::const_iterator e )
{
FC_ASSERT( a0 != e );
return call_generic<R,Args...>( this->bind_first_arg<R,Arg0,Args...>( f, a0->as< typename std::decay<Arg0>::type >() ), a0+1, e );
}
struct api_visitor
{
api_visitor( generic_api& a, const std::shared_ptr<fc::api_connection>& s ):api(a),_api_con(s){ }
template<typename R, typename Arg0, typename ... Args>
std::function<R(Args...)> bind_first_arg( const std::function<R(Arg0,Args...)>& f, Arg0 a0 )const
{
return [=]( Args... args ) { return f( a0, args... ); };
}
template<typename R>
R call_generic( const std::function<R()>& f, variants::const_iterator a0, variants::const_iterator e )const
{
return f();
}
template<typename R, typename Arg0, typename ... Args>
R call_generic( const std::function<R(Arg0,Args...)>& f, variants::const_iterator a0, variants::const_iterator e )const
{
FC_ASSERT( a0 != e );
return call_generic<R,Args...>( bind_first_arg<R,Arg0,Args...>( f, a0->as< typename std::decay<Arg0>::type >() ), a0+1, e );
}
template<typename Interface, typename Adaptor, typename ... Args>
std::function<variant(const fc::variants&)> to_generic( const std::function<api<Interface,Adaptor>(Args...)>& f )const;
@ -68,21 +139,10 @@ namespace fc {
std::function<variant(const fc::variants&)> to_generic( const std::function<fc::optional<api<Interface,Adaptor>>(Args...)>& f )const;
template<typename R, typename ... Args>
std::function<variant(const fc::variants&)> to_generic( const std::function<R(Args...)>& f )const
{
return [=]( const variants& args ) {
return variant( call_generic( f, args.begin(), args.end() ) );
};
}
std::function<variant(const fc::variants&)> to_generic( const std::function<R(Args...)>& f )const;
template<typename ... Args>
std::function<variant(const fc::variants&)> to_generic( const std::function<void(Args...)>& f )const
{
return [=]( const variants& args ) {
call_generic( f, args.begin(), args.end() );
return variant();
};
}
std::function<variant(const fc::variants&)> to_generic( const std::function<void(Args...)>& f )const;
template<typename Result, typename... Args>
void operator()( const char* name, std::function<Result(Args...)>& memb )const {
@ -120,24 +180,49 @@ namespace fc {
}
/** makes calls to the remote server */
virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) = 0;
virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) = 0;
virtual variant send_callback( uint64_t callback_id, variants args = variants() ) = 0;
virtual void send_notice( uint64_t callback_id, variants args = variants() ) = 0;
variant receive_call( api_id_type api_id, const string& method_name, const variants& args = variants() )const
{
//wdump( (api_id)(method_name)(args) );
FC_ASSERT( _local_apis.size() > api_id );
return _local_apis[api_id]->call( method_name, args );
}
variant receive_callback( uint64_t callback_id, const variants& args = variants() )const
{
FC_ASSERT( _local_callbacks.size() > callback_id );
return _local_callbacks[callback_id]( args );
}
void receive_notice( uint64_t callback_id, const variants& args = variants() )const
{
FC_ASSERT( _local_callbacks.size() > callback_id );
_local_callbacks[callback_id]( args );
}
template<typename Interface>
api_id_type register_api( const Interface& a )
{
auto handle = a.get_handle();
auto itr = _handle_to_id.find(handle);
if( itr != _handle_to_id.end() ) return itr->second;
_local_apis.push_back( std::unique_ptr<detail::generic_api>( new detail::generic_api(a, shared_from_this() ) ) );
_handle_to_id[handle] = _local_apis.size() - 1;
return _local_apis.size() - 1;
}
template<typename Signature>
uint64_t register_callback( const std::function<Signature>& cb )
{
_local_callbacks.push_back( detail::to_generic( cb ) );
return _local_callbacks.size() - 1;
}
private:
std::vector< std::unique_ptr<detail::generic_api> > _local_apis;
std::vector< std::unique_ptr<detail::generic_api> > _local_apis;
std::map< uint64_t, api_id_type > _handle_to_id;
std::vector< std::function<variant(const variants&)> > _local_callbacks;
struct api_visitor
@ -160,13 +245,24 @@ namespace fc {
template<typename ResultInterface>
static fc::api<ResultInterface> from_variant( const variant& v,
fc::api<ResultInterface>* /*used for template deduction*/,
const std::shared_ptr<fc::api_connection>& con
)
fc::api<ResultInterface>* /*used for template deduction*/,
const std::shared_ptr<fc::api_connection>& con
)
{
return con->get_remote_api<ResultInterface>( v.as_uint64() );
}
template<typename T>
static fc::variant convert_callbacks( const std::shared_ptr<fc::api_connection>&, const T& v )
{
return fc::variant(v);
}
template<typename Signature>
static fc::variant convert_callbacks( const std::shared_ptr<fc::api_connection>& con, const std::function<Signature>& v )
{
return con->register_callback( v );
}
template<typename Result, typename... Args>
void operator()( const char* name, std::function<Result(Args...)>& memb )const
@ -174,10 +270,19 @@ namespace fc {
auto con = _connection;
auto api_id = _api_id;
memb = [con,api_id,name]( Args... args ) {
auto var_result = con->send_call( api_id, name, {args...} );
auto var_result = con->send_call( api_id, name, { convert_callbacks(con,args)...} );
return from_variant( var_result, (Result*)nullptr, con );
};
}
template<typename... Args>
void operator()( const char* name, std::function<void(Args...)>& memb )const
{
auto con = _connection;
auto api_id = _api_id;
memb = [con,api_id,name]( Args... args ) {
con->send_call( api_id, name, { convert_callbacks(con,args)...} );
};
}
};
};
@ -185,11 +290,22 @@ namespace fc {
{
public:
/** makes calls to the remote server */
virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) override
virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) override
{
FC_ASSERT( _remote_connection );
return _remote_connection->receive_call( api_id, method_name, args );
return _remote_connection->receive_call( api_id, method_name, std::move(args) );
}
virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override
{
FC_ASSERT( _remote_connection );
return _remote_connection->receive_callback( callback_id, args );
}
virtual void send_notice( uint64_t callback_id, variants args = variants() ) override
{
FC_ASSERT( _remote_connection );
_remote_connection->receive_notice( callback_id, args );
}
void set_remote_connection( const std::shared_ptr<fc::api_connection>& rc )
{
@ -214,8 +330,9 @@ namespace fc {
const std::function<fc::api<Interface,Adaptor>(Args...)>& f )const
{
auto api_con = _api_con;
auto gapi = &api;
return [=]( const variants& args ) {
auto api_result = call_generic( f, args.begin(), args.end() );
auto api_result = gapi->call_generic( f, args.begin(), args.end() );
return api_con->register_api( api_result );
};
}
@ -224,46 +341,60 @@ namespace fc {
const std::function<fc::optional<fc::api<Interface,Adaptor>>(Args...)>& f )const
{
auto api_con = _api_con;
auto gapi = &api;
return [=]( const variants& args )-> fc::variant {
auto api_result = call_generic( f, args.begin(), args.end() );
auto api_result = gapi->call_generic( f, args.begin(), args.end() );
if( api_result )
return api_con->register_api( *api_result );
return variant();
};
}
/*
class json_api_connection : public api_connection
template<typename R, typename ... Args>
std::function<variant(const fc::variants&)> detail::generic_api::api_visitor::to_generic( const std::function<R(Args...)>& f )const
{
public:
json_api_connection(){};
void set_json_connection( const std::shared_ptr<rpc::json_connection>& json_con )
{
json_con->add_method( "call", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 3 && args[2].is_array() );
return this->receive_call( args[0].as_uint64(),
args[1].as_string(),
args[2].get_array() );
generic_api* gapi = &api;
return [f,gapi]( const variants& args ) {
return variant( gapi->call_generic( f, args.begin(), args.end() ) );
};
}
});
}
template<typename ... Args>
std::function<variant(const fc::variants&)> detail::generic_api::api_visitor::to_generic( const std::function<void(Args...)>& f )const
{
generic_api* gapi = &api;
return [f,gapi]( const variants& args ) {
gapi->call_generic( f, args.begin(), args.end() );
return variant();
};
}
json_api_connection( const std::shared_ptr<rpc::json_connection>& json_con )
{
set_json_connection( json_con );
}
namespace detail {
template<typename Signature>
template<typename... Args>
typename callback_functor<Signature>::result_type callback_functor<Signature>::operator()( Args... args )const
{
_api_connection.send_callback( _callback_id, fc::variants{ args... } ).template as< result_type >();
}
virtual variant send_call( api_id_type api_id,
const string& method_name,
const variants& args = variants() )const override
{
return _json_con->async_call( "call", {api_id, method_name, args} ).wait();
}
private:
std::shared_ptr<rpc::json_connection> _json_con;
template<typename... Args>
class callback_functor<void(Args...)>
{
public:
typedef void result_type;
};
*/
callback_functor( fc::api_connection& con, uint64_t id )
:_callback_id(id),_api_connection(con){}
void operator()( Args... args )const
{
_api_connection.send_notice( _callback_id, fc::variants{ args... } );
}
private:
uint64_t _callback_id;
fc::api_connection& _api_connection;
};
} // namespace detail
} // fc

View file

@ -19,45 +19,70 @@ namespace fc { namespace rpc {
args[1].as_string(),
args[2].get_array() );
});
_rpc_state.add_method( "notice", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_notice( args[0].as_uint64(), args[1].get_array() );
return variant();
});
_rpc_state.add_method( "callback", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_callback( args[0].as_uint64(), args[1].get_array() );
return variant();
});
_connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg); } );
}
virtual variant send_call( api_id_type api_id,
const string& method_name,
const variants& args = variants() ) override
string method_name,
variants args = variants() ) override
{
auto request = _rpc_state.start_remote_call( "call", {api_id, method_name, args} );
auto request = _rpc_state.start_remote_call( "call", {api_id, std::move(method_name), std::move(args) } );
_connection.send_message( fc::json::to_string(request) );
return _rpc_state.wait_for_response( *request.id );
}
virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override
{
auto request = _rpc_state.start_remote_call( "callback", {callback_id, std::move(args) } );
_connection.send_message( fc::json::to_string(request) );
return _rpc_state.wait_for_response( *request.id );
}
virtual void send_notice( uint64_t callback_id, variants args = variants() ) override
{
fc::rpc::request req{ optional<uint64_t>(), "notice", {callback_id, std::move(args)}};
_connection.send_message( fc::json::to_string(req) );
}
protected:
void on_message( const std::string& message )
{
auto var = fc::json::from_string(message);
const auto& var_obj = var.get_object();
if( var_obj.contains( "method" ) )
{
auto call = var.as<fc::rpc::request>();
try {
auto result = _rpc_state.local_call( call.method, call.params );
if( call.id )
{
_connection.send_message( fc::json::to_string( response( *call.id, result ) ) );
}
}
catch ( const fc::exception& e )
{
try {
auto var = fc::json::from_string(message);
const auto& var_obj = var.get_object();
if( var_obj.contains( "method" ) )
{
if( call.id )
auto call = var.as<fc::rpc::request>();
try {
auto result = _rpc_state.local_call( call.method, call.params );
if( call.id )
{
_connection.send_message( fc::json::to_string( response( *call.id, result ) ) );
}
}
catch ( const fc::exception& e )
{
_connection.send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) );
if( call.id )
{
_connection.send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) );
}
}
}
}
else
{
auto reply = var.as<fc::rpc::response>();
_rpc_state.handle_reply( reply );
else
{
auto reply = var.as<fc::rpc::response>();
_rpc_state.handle_reply( reply );
}
} catch ( const fc::exception& e ) {
wdump((e.to_detail_string()));
}
}
fc::http::websocket_connection& _connection;

View file

@ -31,11 +31,12 @@ void state::handle_reply( const response& response )
FC_ASSERT( await != _awaiting.end(), "Unknown Response ID: ${id}", ("id",response.id)("response",response) );
if( response.result )
await->second->set_value( *response.result );
else
else if( response.error )
{
FC_ASSERT( response.error );
await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",*response.error) ) ) );
}
else
await->second->set_value( fc::variant() );
_awaiting.erase(await);
}

View file

@ -8,9 +8,10 @@ class calculator
public:
int32_t add( int32_t a, int32_t b ); // not implemented
int32_t sub( int32_t a, int32_t b ); // not implemented
void on_result( const std::function<void(int32_t)>& cb );
};
FC_API( calculator, (add)(sub) )
FC_API( calculator, (add)(sub)(on_result) )
class login_api
@ -31,14 +32,18 @@ using namespace fc;
class some_calculator
{
public:
int32_t add( int32_t a, int32_t b ) { return a+b; }
int32_t sub( int32_t a, int32_t b ) { return a-b; }
int32_t add( int32_t a, int32_t b ) { wlog("."); if( _cb ) _cb(a+b); return a+b; }
int32_t sub( int32_t a, int32_t b ) { wlog(".");if( _cb ) _cb(a-b); return a-b; }
void on_result( const std::function<void(int32_t)>& cb ) { wlog( "set callback" ); _cb = cb; return ; }
std::function<void(int32_t)> _cb;
};
class variant_calculator
{
public:
double add( fc::variant a, fc::variant b ) { return a.as_double()+b.as_double(); }
double sub( fc::variant a, fc::variant b ) { return a.as_double()-b.as_double(); }
void on_result( const std::function<void(int32_t)>& cb ) { wlog("set callback"); _cb = cb; return ; }
std::function<void(int32_t)> _cb;
};
using namespace fc::http;
@ -46,7 +51,7 @@ using namespace fc::rpc;
int main( int argc, char** argv )
{
{
try {
fc::api<calculator> calc_api( std::make_shared<some_calculator>() );
fc::http::websocket_server server;
@ -69,6 +74,7 @@ int main( int argc, char** argv )
auto apic = std::make_shared<websocket_api_connection>(*con);
auto remote_login_api = apic->get_remote_api<login_api>();
auto remote_calc = remote_login_api->get_calc();
remote_calc->on_result( []( uint32_t r ) { elog( "callback result ${r}", ("r",r) ); } );
wdump((remote_calc->add( 4, 5 )));
} catch ( const fc::exception& e )
{
@ -76,6 +82,10 @@ int main( int argc, char** argv )
}
}
wlog( "exit scope" );
}
catch( const fc::exception& e )
{
edump((e.to_detail_string()));
}
wlog( "returning now..." );