From 1fb31737a733fca75165f13e0252787108970fc9 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Tue, 31 Mar 2015 11:31:56 -0400 Subject: [PATCH] RPC now supports remote callbacks to_variant now skips null optional members on reflected objects. --- include/fc/api.hpp | 1 + include/fc/reflect/variant.hpp | 13 +- include/fc/rpc/api_connection.hpp | 271 ++++++++++++++++++++++-------- include/fc/rpc/websocket_api.hpp | 73 +++++--- src/rpc/state.cpp | 5 +- tests/api.cpp | 18 +- 6 files changed, 280 insertions(+), 101 deletions(-) diff --git a/include/fc/api.hpp b/include/fc/api.hpp index cf42287..e84f50b 100644 --- a/include/fc/api.hpp +++ b/include/fc/api.hpp @@ -61,6 +61,7 @@ namespace fc { friend bool operator == ( const api& a, const api& b ) { return a._data == b._data && a._vtable == b._vtable; } friend bool operator != ( const api& a, const api& b ) { return !(a._data == b._data && a._vtable == b._vtable); } + uint64_t get_handle()const { return uint64_t(_data.get()); } vtable_type& operator*()const { FC_ASSERT( _vtable ); return *_vtable; } vtable_type* operator->()const { FC_ASSERT( _vtable ); return _vtable.get(); } diff --git a/include/fc/reflect/variant.hpp b/include/fc/reflect/variant.hpp index bc14fc4..f48fe9a 100644 --- a/include/fc/reflect/variant.hpp +++ b/include/fc/reflect/variant.hpp @@ -20,9 +20,20 @@ namespace fc template void operator()( const char* name )const { - vo(name,(val.*member)); + this->add(vo,name,(val.*member)); } + private: + template + void add( mutable_variant_object& vo, const char* name, const optional& v )const + { + if( v.valid() ) + vo(name,*v); + } + template + void add( mutable_variant_object& vo, const char* name, const M& v )const + { vo(name,v); } + mutable_variant_object& vo; const T& val; }; diff --git a/include/fc/rpc/api_connection.hpp b/include/fc/rpc/api_connection.hpp index a4781c9..aece532 100644 --- a/include/fc/rpc/api_connection.hpp +++ b/include/fc/rpc/api_connection.hpp @@ -15,6 +15,58 @@ namespace fc { typedef uint32_t api_id_type; namespace detail { + template + class callback_functor + { + public: + typedef typename std::function::result_type result_type; + + callback_functor( fc::api_connection& con, uint64_t id ) + :_callback_id(id),_api_connection(con){} + + template + result_type operator()( Args... args )const; + + private: + uint64_t _callback_id; + fc::api_connection& _api_connection; + }; + + template + std::function bind_first_arg( const std::function& f, Arg0 a0 ) + { + return [=]( Args... args ) { return f( a0, args... ); }; + } + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + return f(); + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + return call_generic( bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); + } + + template + std::function to_generic( const std::function& f ) + { + return [=]( const variants& args ) { + return variant( call_generic( f, args.begin(), args.end() ) ); + }; + } + + template + std::function to_generic( const std::function& f ) + { + return [=]( const variants& args ) { + call_generic( f, args.begin(), args.end() ); + return variant(); + }; + } + class generic_api { public: @@ -36,31 +88,50 @@ namespace fc { return _methods[method_id](args); } + fc::api_connection& get_connection(){ return *_api_connection; } + + private: friend struct api_visitor; + template + std::function bind_first_arg( const std::function& f, Arg0 a0 )const + { + return [=]( Args... args ) { return f( a0, args... ); }; + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const + { + return f(); + } + + template + R call_generic( const std::function,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + callback_functor arg0( *this, a0->as() ); + return call_generic( this->bind_first_arg,Args...>( f, std::function(arg0) ), a0+1, e ); + } + template + R call_generic( const std::function&,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + callback_functor arg0( get_connection(), a0->as() ); + return call_generic( this->bind_first_arg&,Args...>( f, arg0 ), a0+1, e ); + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + return call_generic( this->bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); + } + struct api_visitor { api_visitor( generic_api& a, const std::shared_ptr& s ):api(a),_api_con(s){ } - template - std::function bind_first_arg( const std::function& f, Arg0 a0 )const - { - return [=]( Args... args ) { return f( a0, args... ); }; - } - template - R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const - { - return f(); - } - - template - R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const - { - FC_ASSERT( a0 != e ); - return call_generic( bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); - } - template std::function to_generic( const std::function(Args...)>& f )const; @@ -68,21 +139,10 @@ namespace fc { std::function to_generic( const std::function>(Args...)>& f )const; template - std::function to_generic( const std::function& f )const - { - return [=]( const variants& args ) { - return variant( call_generic( f, args.begin(), args.end() ) ); - }; - } + std::function to_generic( const std::function& f )const; template - std::function to_generic( const std::function& f )const - { - return [=]( const variants& args ) { - call_generic( f, args.begin(), args.end() ); - return variant(); - }; - } + std::function to_generic( const std::function& f )const; template void operator()( const char* name, std::function& memb )const { @@ -120,24 +180,49 @@ namespace fc { } /** makes calls to the remote server */ - virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) = 0; + virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) = 0; + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) = 0; + virtual void send_notice( uint64_t callback_id, variants args = variants() ) = 0; variant receive_call( api_id_type api_id, const string& method_name, const variants& args = variants() )const { - //wdump( (api_id)(method_name)(args) ); FC_ASSERT( _local_apis.size() > api_id ); return _local_apis[api_id]->call( method_name, args ); } + variant receive_callback( uint64_t callback_id, const variants& args = variants() )const + { + FC_ASSERT( _local_callbacks.size() > callback_id ); + return _local_callbacks[callback_id]( args ); + } + void receive_notice( uint64_t callback_id, const variants& args = variants() )const + { + FC_ASSERT( _local_callbacks.size() > callback_id ); + _local_callbacks[callback_id]( args ); + } template api_id_type register_api( const Interface& a ) { + auto handle = a.get_handle(); + auto itr = _handle_to_id.find(handle); + if( itr != _handle_to_id.end() ) return itr->second; + _local_apis.push_back( std::unique_ptr( new detail::generic_api(a, shared_from_this() ) ) ); + _handle_to_id[handle] = _local_apis.size() - 1; return _local_apis.size() - 1; } + template + uint64_t register_callback( const std::function& cb ) + { + _local_callbacks.push_back( detail::to_generic( cb ) ); + return _local_callbacks.size() - 1; + } + private: - std::vector< std::unique_ptr > _local_apis; + std::vector< std::unique_ptr > _local_apis; + std::map< uint64_t, api_id_type > _handle_to_id; + std::vector< std::function > _local_callbacks; struct api_visitor @@ -160,13 +245,24 @@ namespace fc { template static fc::api from_variant( const variant& v, - fc::api* /*used for template deduction*/, - const std::shared_ptr& con - ) + fc::api* /*used for template deduction*/, + const std::shared_ptr& con + ) { return con->get_remote_api( v.as_uint64() ); } + template + static fc::variant convert_callbacks( const std::shared_ptr&, const T& v ) + { + return fc::variant(v); + } + + template + static fc::variant convert_callbacks( const std::shared_ptr& con, const std::function& v ) + { + return con->register_callback( v ); + } template void operator()( const char* name, std::function& memb )const @@ -174,10 +270,19 @@ namespace fc { auto con = _connection; auto api_id = _api_id; memb = [con,api_id,name]( Args... args ) { - auto var_result = con->send_call( api_id, name, {args...} ); + auto var_result = con->send_call( api_id, name, { convert_callbacks(con,args)...} ); return from_variant( var_result, (Result*)nullptr, con ); }; } + template + void operator()( const char* name, std::function& memb )const + { + auto con = _connection; + auto api_id = _api_id; + memb = [con,api_id,name]( Args... args ) { + con->send_call( api_id, name, { convert_callbacks(con,args)...} ); + }; + } }; }; @@ -185,11 +290,22 @@ namespace fc { { public: /** makes calls to the remote server */ - virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) override + virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) override { FC_ASSERT( _remote_connection ); - return _remote_connection->receive_call( api_id, method_name, args ); + return _remote_connection->receive_call( api_id, method_name, std::move(args) ); } + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override + { + FC_ASSERT( _remote_connection ); + return _remote_connection->receive_callback( callback_id, args ); + } + virtual void send_notice( uint64_t callback_id, variants args = variants() ) override + { + FC_ASSERT( _remote_connection ); + _remote_connection->receive_notice( callback_id, args ); + } + void set_remote_connection( const std::shared_ptr& rc ) { @@ -214,8 +330,9 @@ namespace fc { const std::function(Args...)>& f )const { auto api_con = _api_con; + auto gapi = &api; return [=]( const variants& args ) { - auto api_result = call_generic( f, args.begin(), args.end() ); + auto api_result = gapi->call_generic( f, args.begin(), args.end() ); return api_con->register_api( api_result ); }; } @@ -224,46 +341,60 @@ namespace fc { const std::function>(Args...)>& f )const { auto api_con = _api_con; + auto gapi = &api; return [=]( const variants& args )-> fc::variant { - auto api_result = call_generic( f, args.begin(), args.end() ); + auto api_result = gapi->call_generic( f, args.begin(), args.end() ); if( api_result ) return api_con->register_api( *api_result ); return variant(); }; } - - /* - class json_api_connection : public api_connection + template + std::function detail::generic_api::api_visitor::to_generic( const std::function& f )const { - public: - json_api_connection(){}; - void set_json_connection( const std::shared_ptr& json_con ) - { - json_con->add_method( "call", [this]( const variants& args ) -> variant { - FC_ASSERT( args.size() == 3 && args[2].is_array() ); - return this->receive_call( args[0].as_uint64(), - args[1].as_string(), - args[2].get_array() ); + generic_api* gapi = &api; + return [f,gapi]( const variants& args ) { + return variant( gapi->call_generic( f, args.begin(), args.end() ) ); + }; + } - }); - } + template + std::function detail::generic_api::api_visitor::to_generic( const std::function& f )const + { + generic_api* gapi = &api; + return [f,gapi]( const variants& args ) { + gapi->call_generic( f, args.begin(), args.end() ); + return variant(); + }; + } - json_api_connection( const std::shared_ptr& json_con ) - { - set_json_connection( json_con ); - } + namespace detail { + template + template + typename callback_functor::result_type callback_functor::operator()( Args... args )const + { + _api_connection.send_callback( _callback_id, fc::variants{ args... } ).template as< result_type >(); + } - virtual variant send_call( api_id_type api_id, - const string& method_name, - const variants& args = variants() )const override - { - return _json_con->async_call( "call", {api_id, method_name, args} ).wait(); - } - private: - std::shared_ptr _json_con; + template + class callback_functor + { + public: + typedef void result_type; - }; - */ + callback_functor( fc::api_connection& con, uint64_t id ) + :_callback_id(id),_api_connection(con){} + + void operator()( Args... args )const + { + _api_connection.send_notice( _callback_id, fc::variants{ args... } ); + } + + private: + uint64_t _callback_id; + fc::api_connection& _api_connection; + }; + } // namespace detail } // fc diff --git a/include/fc/rpc/websocket_api.hpp b/include/fc/rpc/websocket_api.hpp index 36bd8e7..833c0e1 100644 --- a/include/fc/rpc/websocket_api.hpp +++ b/include/fc/rpc/websocket_api.hpp @@ -19,45 +19,70 @@ namespace fc { namespace rpc { args[1].as_string(), args[2].get_array() ); }); + _rpc_state.add_method( "notice", [this]( const variants& args ) -> variant { + FC_ASSERT( args.size() == 2 && args[1].is_array() ); + this->receive_notice( args[0].as_uint64(), args[1].get_array() ); + return variant(); + }); + _rpc_state.add_method( "callback", [this]( const variants& args ) -> variant { + FC_ASSERT( args.size() == 2 && args[1].is_array() ); + this->receive_callback( args[0].as_uint64(), args[1].get_array() ); + return variant(); + }); _connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); } virtual variant send_call( api_id_type api_id, - const string& method_name, - const variants& args = variants() ) override + string method_name, + variants args = variants() ) override { - auto request = _rpc_state.start_remote_call( "call", {api_id, method_name, args} ); + auto request = _rpc_state.start_remote_call( "call", {api_id, std::move(method_name), std::move(args) } ); _connection.send_message( fc::json::to_string(request) ); return _rpc_state.wait_for_response( *request.id ); } + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override + { + auto request = _rpc_state.start_remote_call( "callback", {callback_id, std::move(args) } ); + _connection.send_message( fc::json::to_string(request) ); + return _rpc_state.wait_for_response( *request.id ); + } + virtual void send_notice( uint64_t callback_id, variants args = variants() ) override + { + fc::rpc::request req{ optional(), "notice", {callback_id, std::move(args)}}; + _connection.send_message( fc::json::to_string(req) ); + } protected: void on_message( const std::string& message ) - { - auto var = fc::json::from_string(message); - const auto& var_obj = var.get_object(); - if( var_obj.contains( "method" ) ) - { - auto call = var.as(); - try { - auto result = _rpc_state.local_call( call.method, call.params ); - if( call.id ) - { - _connection.send_message( fc::json::to_string( response( *call.id, result ) ) ); - } - } - catch ( const fc::exception& e ) + { + try { + auto var = fc::json::from_string(message); + const auto& var_obj = var.get_object(); + if( var_obj.contains( "method" ) ) { - if( call.id ) + auto call = var.as(); + try { + auto result = _rpc_state.local_call( call.method, call.params ); + if( call.id ) + { + _connection.send_message( fc::json::to_string( response( *call.id, result ) ) ); + } + } + catch ( const fc::exception& e ) { - _connection.send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); + if( call.id ) + { + _connection.send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); + } } } - } - else - { - auto reply = var.as(); - _rpc_state.handle_reply( reply ); + else + { + auto reply = var.as(); + _rpc_state.handle_reply( reply ); + } + } catch ( const fc::exception& e ) { + wdump((e.to_detail_string())); } } fc::http::websocket_connection& _connection; diff --git a/src/rpc/state.cpp b/src/rpc/state.cpp index 4a67faa..eec8584 100644 --- a/src/rpc/state.cpp +++ b/src/rpc/state.cpp @@ -31,11 +31,12 @@ void state::handle_reply( const response& response ) FC_ASSERT( await != _awaiting.end(), "Unknown Response ID: ${id}", ("id",response.id)("response",response) ); if( response.result ) await->second->set_value( *response.result ); - else + else if( response.error ) { - FC_ASSERT( response.error ); await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",*response.error) ) ) ); } + else + await->second->set_value( fc::variant() ); _awaiting.erase(await); } diff --git a/tests/api.cpp b/tests/api.cpp index ec7a2ef..5f19f5c 100644 --- a/tests/api.cpp +++ b/tests/api.cpp @@ -8,9 +8,10 @@ class calculator public: int32_t add( int32_t a, int32_t b ); // not implemented int32_t sub( int32_t a, int32_t b ); // not implemented + void on_result( const std::function& cb ); }; -FC_API( calculator, (add)(sub) ) +FC_API( calculator, (add)(sub)(on_result) ) class login_api @@ -31,14 +32,18 @@ using namespace fc; class some_calculator { public: - int32_t add( int32_t a, int32_t b ) { return a+b; } - int32_t sub( int32_t a, int32_t b ) { return a-b; } + int32_t add( int32_t a, int32_t b ) { wlog("."); if( _cb ) _cb(a+b); return a+b; } + int32_t sub( int32_t a, int32_t b ) { wlog(".");if( _cb ) _cb(a-b); return a-b; } + void on_result( const std::function& cb ) { wlog( "set callback" ); _cb = cb; return ; } + std::function _cb; }; class variant_calculator { public: double add( fc::variant a, fc::variant b ) { return a.as_double()+b.as_double(); } double sub( fc::variant a, fc::variant b ) { return a.as_double()-b.as_double(); } + void on_result( const std::function& cb ) { wlog("set callback"); _cb = cb; return ; } + std::function _cb; }; using namespace fc::http; @@ -46,7 +51,7 @@ using namespace fc::rpc; int main( int argc, char** argv ) { - { + try { fc::api calc_api( std::make_shared() ); fc::http::websocket_server server; @@ -69,6 +74,7 @@ int main( int argc, char** argv ) auto apic = std::make_shared(*con); auto remote_login_api = apic->get_remote_api(); auto remote_calc = remote_login_api->get_calc(); + remote_calc->on_result( []( uint32_t r ) { elog( "callback result ${r}", ("r",r) ); } ); wdump((remote_calc->add( 4, 5 ))); } catch ( const fc::exception& e ) { @@ -76,6 +82,10 @@ int main( int argc, char** argv ) } } wlog( "exit scope" ); + } + catch( const fc::exception& e ) + { + edump((e.to_detail_string())); } wlog( "returning now..." );