From e506e4f4bec28d44830265eba7c08be3526e3fff Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Mon, 30 Mar 2015 16:56:28 -0400 Subject: [PATCH 01/10] added listen on a specific endpoint/port pair --- include/fc/network/http/websocket.hpp | 2 ++ src/network/http/websocket.cpp | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/include/fc/network/http/websocket.hpp b/include/fc/network/http/websocket.hpp index 5d0e554..fbc1c92 100644 --- a/include/fc/network/http/websocket.hpp +++ b/include/fc/network/http/websocket.hpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace fc { namespace http { namespace detail { @@ -38,6 +39,7 @@ namespace fc { namespace http { void on_connection( const on_connection_handler& handler); void listen( uint16_t port ); + void listen( const fc::ip::endpoint& ep ); void start_accept(); private: diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index 740f603..777e76b 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -200,6 +200,10 @@ namespace fc { namespace http { { my->_server.listen(port); } + void websocket_server::listen( const fc::ip::endpoint& ep ) + { + my->_server.listen( boost::asio::ip::tcp::endpoint( boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); + } void websocket_server::start_accept() { my->_server.start_accept(); From bcd642e31f5dfcfc9398e478b40c63b91883b4ad Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Mon, 30 Mar 2015 18:34:04 -0400 Subject: [PATCH 02/10] fix memory leak in circular shared ptrs --- include/fc/rpc/websocket_api.hpp | 14 +++++++------- tests/api.cpp | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/include/fc/rpc/websocket_api.hpp b/include/fc/rpc/websocket_api.hpp index aa2f3f6..36bd8e7 100644 --- a/include/fc/rpc/websocket_api.hpp +++ b/include/fc/rpc/websocket_api.hpp @@ -10,7 +10,7 @@ namespace fc { namespace rpc { class websocket_api_connection : public api_connection { public: - websocket_api_connection( fc::http::websocket_connection_ptr c ) + websocket_api_connection( fc::http::websocket_connection& c ) :_connection(c) { _rpc_state.add_method( "call", [this]( const variants& args ) -> variant { @@ -19,7 +19,7 @@ namespace fc { namespace rpc { args[1].as_string(), args[2].get_array() ); }); - _connection->on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); + _connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); } virtual variant send_call( api_id_type api_id, @@ -27,7 +27,7 @@ namespace fc { namespace rpc { const variants& args = variants() ) override { auto request = _rpc_state.start_remote_call( "call", {api_id, method_name, args} ); - _connection->send_message( fc::json::to_string(request) ); + _connection.send_message( fc::json::to_string(request) ); return _rpc_state.wait_for_response( *request.id ); } @@ -43,14 +43,14 @@ namespace fc { namespace rpc { auto result = _rpc_state.local_call( call.method, call.params ); if( call.id ) { - _connection->send_message( fc::json::to_string( response( *call.id, result ) ) ); + _connection.send_message( fc::json::to_string( response( *call.id, result ) ) ); } } catch ( const fc::exception& e ) { if( call.id ) { - _connection->send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); + _connection.send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); } } } @@ -60,8 +60,8 @@ namespace fc { namespace rpc { _rpc_state.handle_reply( reply ); } } - fc::http::websocket_connection_ptr _connection; - fc::rpc::state _rpc_state; + fc::http::websocket_connection& _connection; + fc::rpc::state _rpc_state; }; } } // namespace fc::rpc diff --git a/tests/api.cpp b/tests/api.cpp index 440479c..ec7a2ef 100644 --- a/tests/api.cpp +++ b/tests/api.cpp @@ -51,7 +51,7 @@ int main( int argc, char** argv ) fc::http::websocket_server server; server.on_connection([&]( const websocket_connection_ptr& c ){ - auto wsc = std::make_shared(c); + auto wsc = std::make_shared(*c); auto login = std::make_shared(); login->calc = calc_api; wsc->register_api(fc::api(login)); @@ -66,7 +66,7 @@ int main( int argc, char** argv ) try { fc::http::websocket_client client; auto con = client.connect( "ws://localhost:8090" ); - auto apic = std::make_shared(con); + auto apic = std::make_shared(*con); auto remote_login_api = apic->get_remote_api(); auto remote_calc = remote_login_api->get_calc(); wdump((remote_calc->add( 4, 5 ))); From 1fb31737a733fca75165f13e0252787108970fc9 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Tue, 31 Mar 2015 11:31:56 -0400 Subject: [PATCH 03/10] RPC now supports remote callbacks to_variant now skips null optional members on reflected objects. --- include/fc/api.hpp | 1 + include/fc/reflect/variant.hpp | 13 +- include/fc/rpc/api_connection.hpp | 271 ++++++++++++++++++++++-------- include/fc/rpc/websocket_api.hpp | 73 +++++--- src/rpc/state.cpp | 5 +- tests/api.cpp | 18 +- 6 files changed, 280 insertions(+), 101 deletions(-) diff --git a/include/fc/api.hpp b/include/fc/api.hpp index cf42287..e84f50b 100644 --- a/include/fc/api.hpp +++ b/include/fc/api.hpp @@ -61,6 +61,7 @@ namespace fc { friend bool operator == ( const api& a, const api& b ) { return a._data == b._data && a._vtable == b._vtable; } friend bool operator != ( const api& a, const api& b ) { return !(a._data == b._data && a._vtable == b._vtable); } + uint64_t get_handle()const { return uint64_t(_data.get()); } vtable_type& operator*()const { FC_ASSERT( _vtable ); return *_vtable; } vtable_type* operator->()const { FC_ASSERT( _vtable ); return _vtable.get(); } diff --git a/include/fc/reflect/variant.hpp b/include/fc/reflect/variant.hpp index bc14fc4..f48fe9a 100644 --- a/include/fc/reflect/variant.hpp +++ b/include/fc/reflect/variant.hpp @@ -20,9 +20,20 @@ namespace fc template void operator()( const char* name )const { - vo(name,(val.*member)); + this->add(vo,name,(val.*member)); } + private: + template + void add( mutable_variant_object& vo, const char* name, const optional& v )const + { + if( v.valid() ) + vo(name,*v); + } + template + void add( mutable_variant_object& vo, const char* name, const M& v )const + { vo(name,v); } + mutable_variant_object& vo; const T& val; }; diff --git a/include/fc/rpc/api_connection.hpp b/include/fc/rpc/api_connection.hpp index a4781c9..aece532 100644 --- a/include/fc/rpc/api_connection.hpp +++ b/include/fc/rpc/api_connection.hpp @@ -15,6 +15,58 @@ namespace fc { typedef uint32_t api_id_type; namespace detail { + template + class callback_functor + { + public: + typedef typename std::function::result_type result_type; + + callback_functor( fc::api_connection& con, uint64_t id ) + :_callback_id(id),_api_connection(con){} + + template + result_type operator()( Args... args )const; + + private: + uint64_t _callback_id; + fc::api_connection& _api_connection; + }; + + template + std::function bind_first_arg( const std::function& f, Arg0 a0 ) + { + return [=]( Args... args ) { return f( a0, args... ); }; + } + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + return f(); + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + return call_generic( bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); + } + + template + std::function to_generic( const std::function& f ) + { + return [=]( const variants& args ) { + return variant( call_generic( f, args.begin(), args.end() ) ); + }; + } + + template + std::function to_generic( const std::function& f ) + { + return [=]( const variants& args ) { + call_generic( f, args.begin(), args.end() ); + return variant(); + }; + } + class generic_api { public: @@ -36,31 +88,50 @@ namespace fc { return _methods[method_id](args); } + fc::api_connection& get_connection(){ return *_api_connection; } + + private: friend struct api_visitor; + template + std::function bind_first_arg( const std::function& f, Arg0 a0 )const + { + return [=]( Args... args ) { return f( a0, args... ); }; + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const + { + return f(); + } + + template + R call_generic( const std::function,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + callback_functor arg0( *this, a0->as() ); + return call_generic( this->bind_first_arg,Args...>( f, std::function(arg0) ), a0+1, e ); + } + template + R call_generic( const std::function&,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + callback_functor arg0( get_connection(), a0->as() ); + return call_generic( this->bind_first_arg&,Args...>( f, arg0 ), a0+1, e ); + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + return call_generic( this->bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); + } + struct api_visitor { api_visitor( generic_api& a, const std::shared_ptr& s ):api(a),_api_con(s){ } - template - std::function bind_first_arg( const std::function& f, Arg0 a0 )const - { - return [=]( Args... args ) { return f( a0, args... ); }; - } - template - R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const - { - return f(); - } - - template - R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const - { - FC_ASSERT( a0 != e ); - return call_generic( bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); - } - template std::function to_generic( const std::function(Args...)>& f )const; @@ -68,21 +139,10 @@ namespace fc { std::function to_generic( const std::function>(Args...)>& f )const; template - std::function to_generic( const std::function& f )const - { - return [=]( const variants& args ) { - return variant( call_generic( f, args.begin(), args.end() ) ); - }; - } + std::function to_generic( const std::function& f )const; template - std::function to_generic( const std::function& f )const - { - return [=]( const variants& args ) { - call_generic( f, args.begin(), args.end() ); - return variant(); - }; - } + std::function to_generic( const std::function& f )const; template void operator()( const char* name, std::function& memb )const { @@ -120,24 +180,49 @@ namespace fc { } /** makes calls to the remote server */ - virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) = 0; + virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) = 0; + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) = 0; + virtual void send_notice( uint64_t callback_id, variants args = variants() ) = 0; variant receive_call( api_id_type api_id, const string& method_name, const variants& args = variants() )const { - //wdump( (api_id)(method_name)(args) ); FC_ASSERT( _local_apis.size() > api_id ); return _local_apis[api_id]->call( method_name, args ); } + variant receive_callback( uint64_t callback_id, const variants& args = variants() )const + { + FC_ASSERT( _local_callbacks.size() > callback_id ); + return _local_callbacks[callback_id]( args ); + } + void receive_notice( uint64_t callback_id, const variants& args = variants() )const + { + FC_ASSERT( _local_callbacks.size() > callback_id ); + _local_callbacks[callback_id]( args ); + } template api_id_type register_api( const Interface& a ) { + auto handle = a.get_handle(); + auto itr = _handle_to_id.find(handle); + if( itr != _handle_to_id.end() ) return itr->second; + _local_apis.push_back( std::unique_ptr( new detail::generic_api(a, shared_from_this() ) ) ); + _handle_to_id[handle] = _local_apis.size() - 1; return _local_apis.size() - 1; } + template + uint64_t register_callback( const std::function& cb ) + { + _local_callbacks.push_back( detail::to_generic( cb ) ); + return _local_callbacks.size() - 1; + } + private: - std::vector< std::unique_ptr > _local_apis; + std::vector< std::unique_ptr > _local_apis; + std::map< uint64_t, api_id_type > _handle_to_id; + std::vector< std::function > _local_callbacks; struct api_visitor @@ -160,13 +245,24 @@ namespace fc { template static fc::api from_variant( const variant& v, - fc::api* /*used for template deduction*/, - const std::shared_ptr& con - ) + fc::api* /*used for template deduction*/, + const std::shared_ptr& con + ) { return con->get_remote_api( v.as_uint64() ); } + template + static fc::variant convert_callbacks( const std::shared_ptr&, const T& v ) + { + return fc::variant(v); + } + + template + static fc::variant convert_callbacks( const std::shared_ptr& con, const std::function& v ) + { + return con->register_callback( v ); + } template void operator()( const char* name, std::function& memb )const @@ -174,10 +270,19 @@ namespace fc { auto con = _connection; auto api_id = _api_id; memb = [con,api_id,name]( Args... args ) { - auto var_result = con->send_call( api_id, name, {args...} ); + auto var_result = con->send_call( api_id, name, { convert_callbacks(con,args)...} ); return from_variant( var_result, (Result*)nullptr, con ); }; } + template + void operator()( const char* name, std::function& memb )const + { + auto con = _connection; + auto api_id = _api_id; + memb = [con,api_id,name]( Args... args ) { + con->send_call( api_id, name, { convert_callbacks(con,args)...} ); + }; + } }; }; @@ -185,11 +290,22 @@ namespace fc { { public: /** makes calls to the remote server */ - virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) override + virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) override { FC_ASSERT( _remote_connection ); - return _remote_connection->receive_call( api_id, method_name, args ); + return _remote_connection->receive_call( api_id, method_name, std::move(args) ); } + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override + { + FC_ASSERT( _remote_connection ); + return _remote_connection->receive_callback( callback_id, args ); + } + virtual void send_notice( uint64_t callback_id, variants args = variants() ) override + { + FC_ASSERT( _remote_connection ); + _remote_connection->receive_notice( callback_id, args ); + } + void set_remote_connection( const std::shared_ptr& rc ) { @@ -214,8 +330,9 @@ namespace fc { const std::function(Args...)>& f )const { auto api_con = _api_con; + auto gapi = &api; return [=]( const variants& args ) { - auto api_result = call_generic( f, args.begin(), args.end() ); + auto api_result = gapi->call_generic( f, args.begin(), args.end() ); return api_con->register_api( api_result ); }; } @@ -224,46 +341,60 @@ namespace fc { const std::function>(Args...)>& f )const { auto api_con = _api_con; + auto gapi = &api; return [=]( const variants& args )-> fc::variant { - auto api_result = call_generic( f, args.begin(), args.end() ); + auto api_result = gapi->call_generic( f, args.begin(), args.end() ); if( api_result ) return api_con->register_api( *api_result ); return variant(); }; } - - /* - class json_api_connection : public api_connection + template + std::function detail::generic_api::api_visitor::to_generic( const std::function& f )const { - public: - json_api_connection(){}; - void set_json_connection( const std::shared_ptr& json_con ) - { - json_con->add_method( "call", [this]( const variants& args ) -> variant { - FC_ASSERT( args.size() == 3 && args[2].is_array() ); - return this->receive_call( args[0].as_uint64(), - args[1].as_string(), - args[2].get_array() ); + generic_api* gapi = &api; + return [f,gapi]( const variants& args ) { + return variant( gapi->call_generic( f, args.begin(), args.end() ) ); + }; + } - }); - } + template + std::function detail::generic_api::api_visitor::to_generic( const std::function& f )const + { + generic_api* gapi = &api; + return [f,gapi]( const variants& args ) { + gapi->call_generic( f, args.begin(), args.end() ); + return variant(); + }; + } - json_api_connection( const std::shared_ptr& json_con ) - { - set_json_connection( json_con ); - } + namespace detail { + template + template + typename callback_functor::result_type callback_functor::operator()( Args... args )const + { + _api_connection.send_callback( _callback_id, fc::variants{ args... } ).template as< result_type >(); + } - virtual variant send_call( api_id_type api_id, - const string& method_name, - const variants& args = variants() )const override - { - return _json_con->async_call( "call", {api_id, method_name, args} ).wait(); - } - private: - std::shared_ptr _json_con; + template + class callback_functor + { + public: + typedef void result_type; - }; - */ + callback_functor( fc::api_connection& con, uint64_t id ) + :_callback_id(id),_api_connection(con){} + + void operator()( Args... args )const + { + _api_connection.send_notice( _callback_id, fc::variants{ args... } ); + } + + private: + uint64_t _callback_id; + fc::api_connection& _api_connection; + }; + } // namespace detail } // fc diff --git a/include/fc/rpc/websocket_api.hpp b/include/fc/rpc/websocket_api.hpp index 36bd8e7..833c0e1 100644 --- a/include/fc/rpc/websocket_api.hpp +++ b/include/fc/rpc/websocket_api.hpp @@ -19,45 +19,70 @@ namespace fc { namespace rpc { args[1].as_string(), args[2].get_array() ); }); + _rpc_state.add_method( "notice", [this]( const variants& args ) -> variant { + FC_ASSERT( args.size() == 2 && args[1].is_array() ); + this->receive_notice( args[0].as_uint64(), args[1].get_array() ); + return variant(); + }); + _rpc_state.add_method( "callback", [this]( const variants& args ) -> variant { + FC_ASSERT( args.size() == 2 && args[1].is_array() ); + this->receive_callback( args[0].as_uint64(), args[1].get_array() ); + return variant(); + }); _connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); } virtual variant send_call( api_id_type api_id, - const string& method_name, - const variants& args = variants() ) override + string method_name, + variants args = variants() ) override { - auto request = _rpc_state.start_remote_call( "call", {api_id, method_name, args} ); + auto request = _rpc_state.start_remote_call( "call", {api_id, std::move(method_name), std::move(args) } ); _connection.send_message( fc::json::to_string(request) ); return _rpc_state.wait_for_response( *request.id ); } + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override + { + auto request = _rpc_state.start_remote_call( "callback", {callback_id, std::move(args) } ); + _connection.send_message( fc::json::to_string(request) ); + return _rpc_state.wait_for_response( *request.id ); + } + virtual void send_notice( uint64_t callback_id, variants args = variants() ) override + { + fc::rpc::request req{ optional(), "notice", {callback_id, std::move(args)}}; + _connection.send_message( fc::json::to_string(req) ); + } protected: void on_message( const std::string& message ) - { - auto var = fc::json::from_string(message); - const auto& var_obj = var.get_object(); - if( var_obj.contains( "method" ) ) - { - auto call = var.as(); - try { - auto result = _rpc_state.local_call( call.method, call.params ); - if( call.id ) - { - _connection.send_message( fc::json::to_string( response( *call.id, result ) ) ); - } - } - catch ( const fc::exception& e ) + { + try { + auto var = fc::json::from_string(message); + const auto& var_obj = var.get_object(); + if( var_obj.contains( "method" ) ) { - if( call.id ) + auto call = var.as(); + try { + auto result = _rpc_state.local_call( call.method, call.params ); + if( call.id ) + { + _connection.send_message( fc::json::to_string( response( *call.id, result ) ) ); + } + } + catch ( const fc::exception& e ) { - _connection.send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); + if( call.id ) + { + _connection.send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); + } } } - } - else - { - auto reply = var.as(); - _rpc_state.handle_reply( reply ); + else + { + auto reply = var.as(); + _rpc_state.handle_reply( reply ); + } + } catch ( const fc::exception& e ) { + wdump((e.to_detail_string())); } } fc::http::websocket_connection& _connection; diff --git a/src/rpc/state.cpp b/src/rpc/state.cpp index 4a67faa..eec8584 100644 --- a/src/rpc/state.cpp +++ b/src/rpc/state.cpp @@ -31,11 +31,12 @@ void state::handle_reply( const response& response ) FC_ASSERT( await != _awaiting.end(), "Unknown Response ID: ${id}", ("id",response.id)("response",response) ); if( response.result ) await->second->set_value( *response.result ); - else + else if( response.error ) { - FC_ASSERT( response.error ); await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",*response.error) ) ) ); } + else + await->second->set_value( fc::variant() ); _awaiting.erase(await); } diff --git a/tests/api.cpp b/tests/api.cpp index ec7a2ef..5f19f5c 100644 --- a/tests/api.cpp +++ b/tests/api.cpp @@ -8,9 +8,10 @@ class calculator public: int32_t add( int32_t a, int32_t b ); // not implemented int32_t sub( int32_t a, int32_t b ); // not implemented + void on_result( const std::function& cb ); }; -FC_API( calculator, (add)(sub) ) +FC_API( calculator, (add)(sub)(on_result) ) class login_api @@ -31,14 +32,18 @@ using namespace fc; class some_calculator { public: - int32_t add( int32_t a, int32_t b ) { return a+b; } - int32_t sub( int32_t a, int32_t b ) { return a-b; } + int32_t add( int32_t a, int32_t b ) { wlog("."); if( _cb ) _cb(a+b); return a+b; } + int32_t sub( int32_t a, int32_t b ) { wlog(".");if( _cb ) _cb(a-b); return a-b; } + void on_result( const std::function& cb ) { wlog( "set callback" ); _cb = cb; return ; } + std::function _cb; }; class variant_calculator { public: double add( fc::variant a, fc::variant b ) { return a.as_double()+b.as_double(); } double sub( fc::variant a, fc::variant b ) { return a.as_double()-b.as_double(); } + void on_result( const std::function& cb ) { wlog("set callback"); _cb = cb; return ; } + std::function _cb; }; using namespace fc::http; @@ -46,7 +51,7 @@ using namespace fc::rpc; int main( int argc, char** argv ) { - { + try { fc::api calc_api( std::make_shared() ); fc::http::websocket_server server; @@ -69,6 +74,7 @@ int main( int argc, char** argv ) auto apic = std::make_shared(*con); auto remote_login_api = apic->get_remote_api(); auto remote_calc = remote_login_api->get_calc(); + remote_calc->on_result( []( uint32_t r ) { elog( "callback result ${r}", ("r",r) ); } ); wdump((remote_calc->add( 4, 5 ))); } catch ( const fc::exception& e ) { @@ -76,6 +82,10 @@ int main( int argc, char** argv ) } } wlog( "exit scope" ); + } + catch( const fc::exception& e ) + { + edump((e.to_detail_string())); } wlog( "returning now..." ); From 55ee57040ac76c324de96eb61befe27120a3ab10 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Tue, 31 Mar 2015 17:45:01 -0400 Subject: [PATCH 04/10] added support for unhandled extension and mapping the first registered API to global namespace --- include/fc/rpc/api_connection.hpp | 201 +++++++++++++++--------------- include/fc/rpc/state.hpp | 3 + include/fc/rpc/websocket_api.hpp | 7 ++ src/rpc/state.cpp | 6 + 4 files changed, 117 insertions(+), 100 deletions(-) diff --git a/include/fc/rpc/api_connection.hpp b/include/fc/rpc/api_connection.hpp index aece532..1f1c6f5 100644 --- a/include/fc/rpc/api_connection.hpp +++ b/include/fc/rpc/api_connection.hpp @@ -67,101 +67,102 @@ namespace fc { }; } - class generic_api - { - public: - template - generic_api( const Api& a, const std::shared_ptr& c ); - - generic_api( const generic_api& cpy ) = delete; - - variant call( const string& name, const variants& args ) - { - auto itr = _by_name.find(name); - FC_ASSERT( itr != _by_name.end() ); - return call( itr->second, args ); - } - - variant call( uint32_t method_id, const variants& args ) - { - FC_ASSERT( method_id < _methods.size() ); - return _methods[method_id](args); - } - - fc::api_connection& get_connection(){ return *_api_connection; } - - - private: - friend struct api_visitor; - - template - std::function bind_first_arg( const std::function& f, Arg0 a0 )const - { - return [=]( Args... args ) { return f( a0, args... ); }; - } - - template - R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const - { - return f(); - } - - template - R call_generic( const std::function,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) - { - FC_ASSERT( a0 != e ); - callback_functor arg0( *this, a0->as() ); - return call_generic( this->bind_first_arg,Args...>( f, std::function(arg0) ), a0+1, e ); - } - template - R call_generic( const std::function&,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) - { - FC_ASSERT( a0 != e ); - callback_functor arg0( get_connection(), a0->as() ); - return call_generic( this->bind_first_arg&,Args...>( f, arg0 ), a0+1, e ); - } - - template - R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) - { - FC_ASSERT( a0 != e ); - return call_generic( this->bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); - } - - struct api_visitor - { - api_visitor( generic_api& a, const std::shared_ptr& s ):api(a),_api_con(s){ } - - template - std::function to_generic( const std::function(Args...)>& f )const; - - template - std::function to_generic( const std::function>(Args...)>& f )const; - - template - std::function to_generic( const std::function& f )const; - - template - std::function to_generic( const std::function& f )const; - - template - void operator()( const char* name, std::function& memb )const { - api._methods.emplace_back( to_generic( memb ) ); - api._by_name[name] = api._methods.size() - 1; - } - - generic_api& api; - const std::shared_ptr& _api_con; - }; - - - std::shared_ptr _api_connection; - fc::any _api; - std::map< std::string, uint32_t > _by_name; - std::vector< std::function > _methods; - }; // class generic_api } // namespace detail + class generic_api + { + public: + template + generic_api( const Api& a, const std::shared_ptr& c ); + + generic_api( const generic_api& cpy ) = delete; + + variant call( const string& name, const variants& args ) + { + auto itr = _by_name.find(name); + FC_ASSERT( itr != _by_name.end() ); + return call( itr->second, args ); + } + + variant call( uint32_t method_id, const variants& args ) + { + FC_ASSERT( method_id < _methods.size() ); + return _methods[method_id](args); + } + + fc::api_connection& get_connection(){ return *_api_connection; } + + + private: + friend struct api_visitor; + + template + std::function bind_first_arg( const std::function& f, Arg0 a0 )const + { + return [=]( Args... args ) { return f( a0, args... ); }; + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const + { + return f(); + } + + template + R call_generic( const std::function,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + detail::callback_functor arg0( *this, a0->as() ); + return call_generic( this->bind_first_arg,Args...>( f, std::function(arg0) ), a0+1, e ); + } + template + R call_generic( const std::function&,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + detail::callback_functor arg0( get_connection(), a0->as() ); + return call_generic( this->bind_first_arg&,Args...>( f, arg0 ), a0+1, e ); + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + return call_generic( this->bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); + } + + struct api_visitor + { + api_visitor( generic_api& a, const std::shared_ptr& s ):api(a),_api_con(s){ } + + template + std::function to_generic( const std::function(Args...)>& f )const; + + template + std::function to_generic( const std::function>(Args...)>& f )const; + + template + std::function to_generic( const std::function& f )const; + + template + std::function to_generic( const std::function& f )const; + + template + void operator()( const char* name, std::function& memb )const { + api._methods.emplace_back( to_generic( memb ) ); + api._by_name[name] = api._methods.size() - 1; + } + + generic_api& api; + const std::shared_ptr& _api_con; + }; + + + std::shared_ptr _api_connection; + fc::any _api; + std::map< std::string, uint32_t > _by_name; + std::vector< std::function > _methods; + }; // class generic_api + class api_connection : public std::enable_shared_from_this @@ -207,7 +208,7 @@ namespace fc { auto itr = _handle_to_id.find(handle); if( itr != _handle_to_id.end() ) return itr->second; - _local_apis.push_back( std::unique_ptr( new detail::generic_api(a, shared_from_this() ) ) ); + _local_apis.push_back( std::unique_ptr( new generic_api(a, shared_from_this() ) ) ); _handle_to_id[handle] = _local_apis.size() - 1; return _local_apis.size() - 1; } @@ -220,7 +221,7 @@ namespace fc { } private: - std::vector< std::unique_ptr > _local_apis; + std::vector< std::unique_ptr > _local_apis; std::map< uint64_t, api_id_type > _handle_to_id; std::vector< std::function > _local_callbacks; @@ -319,14 +320,14 @@ namespace fc { }; template - detail::generic_api::generic_api( const Api& a, const std::shared_ptr& c ) + generic_api::generic_api( const Api& a, const std::shared_ptr& c ) :_api_connection(c),_api(a) { boost::any_cast(a)->visit( api_visitor( *this, _api_connection ) ); } template - std::function detail::generic_api::api_visitor::to_generic( + std::function generic_api::api_visitor::to_generic( const std::function(Args...)>& f )const { auto api_con = _api_con; @@ -337,7 +338,7 @@ namespace fc { }; } template - std::function detail::generic_api::api_visitor::to_generic( + std::function generic_api::api_visitor::to_generic( const std::function>(Args...)>& f )const { auto api_con = _api_con; @@ -350,7 +351,7 @@ namespace fc { }; } template - std::function detail::generic_api::api_visitor::to_generic( const std::function& f )const + std::function generic_api::api_visitor::to_generic( const std::function& f )const { generic_api* gapi = &api; return [f,gapi]( const variants& args ) { @@ -359,7 +360,7 @@ namespace fc { } template - std::function detail::generic_api::api_visitor::to_generic( const std::function& f )const + std::function generic_api::api_visitor::to_generic( const std::function& f )const { generic_api* gapi = &api; return [f,gapi]( const variants& args ) { diff --git a/include/fc/rpc/state.hpp b/include/fc/rpc/state.hpp index f870750..3c36bcf 100644 --- a/include/fc/rpc/state.hpp +++ b/include/fc/rpc/state.hpp @@ -45,10 +45,13 @@ namespace fc { namespace rpc { void close(); + void on_unhandled( const std::function& unhandled ); + private: uint64_t _next_id = 1; std::unordered_map::ptr> _awaiting; std::unordered_map _methods; + std::function _unhandled; }; } } // namespace fc::rpc diff --git a/include/fc/rpc/websocket_api.hpp b/include/fc/rpc/websocket_api.hpp index 833c0e1..fa0158b 100644 --- a/include/fc/rpc/websocket_api.hpp +++ b/include/fc/rpc/websocket_api.hpp @@ -19,16 +19,23 @@ namespace fc { namespace rpc { args[1].as_string(), args[2].get_array() ); }); + _rpc_state.add_method( "notice", [this]( const variants& args ) -> variant { FC_ASSERT( args.size() == 2 && args[1].is_array() ); this->receive_notice( args[0].as_uint64(), args[1].get_array() ); return variant(); }); + _rpc_state.add_method( "callback", [this]( const variants& args ) -> variant { FC_ASSERT( args.size() == 2 && args[1].is_array() ); this->receive_callback( args[0].as_uint64(), args[1].get_array() ); return variant(); }); + + _rpc_state.on_unhandled( [&]( const std::string& method_name, const variants& args ){ + return this->receive_call( 0, method_name, args ); + }); + _connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); } diff --git a/src/rpc/state.cpp b/src/rpc/state.cpp index eec8584..b5cb95f 100644 --- a/src/rpc/state.cpp +++ b/src/rpc/state.cpp @@ -21,6 +21,8 @@ void state::remove_method( const fc::string& name ) variant state::local_call( const string& method_name, const variants& args ) { auto method_itr = _methods.find(method_name); + if( method_itr == _methods.end() && _unhandled ) + return _unhandled( method_name, args ); FC_ASSERT( method_itr != _methods.end(), "Unknown Method: ${name}", ("name",method_name) ); return method_itr->second(args); } @@ -58,5 +60,9 @@ void state::close() item.second->set_exception( fc::exception_ptr(new FC_EXCEPTION( eof_exception, "connection closed" )) ); _awaiting.clear(); } +void state::on_unhandled( const std::function& unhandled ) +{ + _unhandled = unhandled; +} } } // namespace fc::rpc From 633ab3f32d5ac42d36b78f4c6c4675a3f5ba9ab9 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Tue, 31 Mar 2015 17:45:08 -0400 Subject: [PATCH 05/10] Adding cli wrapper to expose APIs to the CLI --- include/fc/rpc/cli.hpp | 75 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 include/fc/rpc/cli.hpp diff --git a/include/fc/rpc/cli.hpp b/include/fc/rpc/cli.hpp new file mode 100644 index 0000000..076cae3 --- /dev/null +++ b/include/fc/rpc/cli.hpp @@ -0,0 +1,75 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace fc { namespace rpc { + + /** + * Provides a simple wrapper for RPC calls to a given interface. + */ + class cli : public api_connection + { + public: + virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) + { + FC_ASSERT(false); + } + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) + { + FC_ASSERT(false); + } + virtual void send_notice( uint64_t callback_id, variants args = variants() ) + { + FC_ASSERT(false); + } + + void start() + { + _run_complete = fc::async( [&](){ run(); } ); + } + void stop() + { + _run_complete.cancel(); + _run_complete.wait(); + } + void format_result( const string& method, std::function formatter) + { + _result_formatters[method] = formatter; + } + private: + void run() + { + while( !_run_complete.canceled() ) + { + std::cout << ">>> "; + std::string line; + fc::getline( fc::cin, line ); + auto line_stream = std::make_shared( line ); + buffered_istream bstream(line_stream); + fc::variant method; + fc::variants args; + + method = fc::json::from_stream( bstream ); + while( true ) + { + try { + args.push_back( fc::json::from_stream( bstream ) ); + } catch ( const fc::eof_exception& eof ){} + } + auto result = receive_call( 0, method.get_string(), args ); + auto itr = _result_formatters.find( method.get_string() ); + if( itr == _result_formatters.end() ) + { + std::cout << fc::json::to_pretty_string( result ) << "\n"; + } + else + std::cout << itr->second( result, args ) << "\n"; + } + } + std::map > _result_formatters; + fc::future _run_complete; + }; +} } From 8b5e2e7613ec8185b4df90ed7e82ec18fa841f50 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Tue, 31 Mar 2015 18:46:05 -0400 Subject: [PATCH 06/10] adding helper to get a list of variants from a string --- include/fc/io/json.hpp | 1 + include/fc/rpc/cli.hpp | 58 +++++++++++++++++++++++++----------------- src/io/json.cpp | 15 +++++++++++ 3 files changed, 50 insertions(+), 24 deletions(-) diff --git a/include/fc/io/json.hpp b/include/fc/io/json.hpp index 983025f..f423803 100644 --- a/include/fc/io/json.hpp +++ b/include/fc/io/json.hpp @@ -30,6 +30,7 @@ namespace fc static variant from_stream( buffered_istream& in, parse_type ptype = legacy_parser ); static variant from_string( const string& utf8_str, parse_type ptype = legacy_parser ); + static variants variants_from_string( const string& utf8_str, parse_type ptype = legacy_parser ); static string to_string( const variant& v ); static string to_pretty_string( const variant& v ); diff --git a/include/fc/rpc/cli.hpp b/include/fc/rpc/cli.hpp index 076cae3..2af3553 100644 --- a/include/fc/rpc/cli.hpp +++ b/include/fc/rpc/cli.hpp @@ -13,6 +13,13 @@ namespace fc { namespace rpc { class cli : public api_connection { public: + ~cli() + { + if( _run_complete.valid() ) + { + stop(); + } + } virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) { FC_ASSERT(false); @@ -35,6 +42,7 @@ namespace fc { namespace rpc { _run_complete.cancel(); _run_complete.wait(); } + void wait(){ _run_complete.wait(); } void format_result( const string& method, std::function formatter) { _result_formatters[method] = formatter; @@ -42,32 +50,34 @@ namespace fc { namespace rpc { private: void run() { - while( !_run_complete.canceled() ) - { - std::cout << ">>> "; - std::string line; - fc::getline( fc::cin, line ); - auto line_stream = std::make_shared( line ); - buffered_istream bstream(line_stream); - fc::variant method; - fc::variants args; - - method = fc::json::from_stream( bstream ); - while( true ) + while( !fc::cin.eof() && !_run_complete.canceled() ) { try { - args.push_back( fc::json::from_stream( bstream ) ); - } catch ( const fc::eof_exception& eof ){} - } - auto result = receive_call( 0, method.get_string(), args ); - auto itr = _result_formatters.find( method.get_string() ); - if( itr == _result_formatters.end() ) - { - std::cout << fc::json::to_pretty_string( result ) << "\n"; - } - else - std::cout << itr->second( result, args ) << "\n"; - } + std::cout << ">>> "; + std::cout.flush(); + std::string line; + fc::getline( fc::cin, line ); + std::cout << line << "\n"; + line += char(EOF); + fc::variants args = fc::json::variants_from_string(line);; + if( args.size() == 0 ) continue; + + const string& method = args[0].get_string(); + + auto result = receive_call( 0, method, variants( args.begin()+1,args.end() ) ); + auto itr = _result_formatters.find( method ); + if( itr == _result_formatters.end() ) + { + std::cout << fc::json::to_pretty_string( result ) << "\n"; + } + else + std::cout << itr->second( result, args ) << "\n"; + } + catch ( const fc::exception& e ) + { + edump((e.to_detail_string())); + } + } } std::map > _result_formatters; fc::future _run_complete; diff --git a/src/io/json.cpp b/src/io/json.cpp index 4ecf307..70a572c 100644 --- a/src/io/json.cpp +++ b/src/io/json.cpp @@ -427,6 +427,7 @@ namespace fc return token_from_stream( in ); case 0x04: // ^D end of transmission case EOF: + case 0: FC_THROW_EXCEPTION( eof_exception, "unexpected end of file" ); default: FC_THROW_EXCEPTION( parse_error_exception, "Unexpected char '${c}' in \"${s}\"", @@ -453,6 +454,20 @@ namespace fc } } FC_RETHROW_EXCEPTIONS( warn, "", ("str",utf8_str) ) } + variants json::variants_from_string( const std::string& utf8_str, parse_type ptype ) + { try { + variants result; + fc::stringstream in( utf8_str ); + //in.exceptions( std::ifstream::eofbit ); + try { + while( true ) + { + // result.push_back( variant_from_stream( in )); + result.push_back(json_relaxed::variant_from_stream( in )); + } + } catch ( const fc::eof_exception& ){} + return result; + } FC_RETHROW_EXCEPTIONS( warn, "", ("str",utf8_str) ) } /* void toUTF8( const char str, ostream& os ) { From c8200afadeb4c6c7762c40ef97daeadbd7afe25a Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Wed, 1 Apr 2015 10:25:57 -0400 Subject: [PATCH 07/10] better close notification and error handling --- include/fc/network/http/websocket.hpp | 3 +++ include/fc/rpc/api_connection.hpp | 3 ++- include/fc/rpc/cli.hpp | 2 +- include/fc/rpc/websocket_api.hpp | 2 ++ src/network/http/websocket.cpp | 9 ++++++--- src/rpc/state.cpp | 2 +- 6 files changed, 15 insertions(+), 6 deletions(-) diff --git a/include/fc/network/http/websocket.hpp b/include/fc/network/http/websocket.hpp index fbc1c92..5c253aa 100644 --- a/include/fc/network/http/websocket.hpp +++ b/include/fc/network/http/websocket.hpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace fc { namespace http { namespace detail { @@ -23,6 +24,8 @@ namespace fc { namespace http { void set_session_data( fc::any d ){ _session_data = std::move(d); } fc::any& get_session_data() { return _session_data; } + + fc::signal closed; private: fc::any _session_data; std::function _on_message; diff --git a/include/fc/rpc/api_connection.hpp b/include/fc/rpc/api_connection.hpp index 1f1c6f5..32258e7 100644 --- a/include/fc/rpc/api_connection.hpp +++ b/include/fc/rpc/api_connection.hpp @@ -80,7 +80,7 @@ namespace fc { variant call( const string& name, const variants& args ) { auto itr = _by_name.find(name); - FC_ASSERT( itr != _by_name.end() ); + FC_ASSERT( itr != _by_name.end(), "no method with name '${name}'", ("name",name)("api",_by_name) ); return call( itr->second, args ); } @@ -220,6 +220,7 @@ namespace fc { return _local_callbacks.size() - 1; } + fc::signal closed; private: std::vector< std::unique_ptr > _local_apis; std::map< uint64_t, api_id_type > _handle_to_id; diff --git a/include/fc/rpc/cli.hpp b/include/fc/rpc/cli.hpp index 2af3553..a25e424 100644 --- a/include/fc/rpc/cli.hpp +++ b/include/fc/rpc/cli.hpp @@ -75,7 +75,7 @@ namespace fc { namespace rpc { } catch ( const fc::exception& e ) { - edump((e.to_detail_string())); + std::cout << e.to_detail_string() << "\n"; } } } diff --git a/include/fc/rpc/websocket_api.hpp b/include/fc/rpc/websocket_api.hpp index fa0158b..202a2cb 100644 --- a/include/fc/rpc/websocket_api.hpp +++ b/include/fc/rpc/websocket_api.hpp @@ -37,6 +37,7 @@ namespace fc { namespace rpc { }); _connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); + _connection.closed.connect( [this](){ closed(); } ); } virtual variant send_call( api_id_type api_id, @@ -59,6 +60,7 @@ namespace fc { namespace rpc { _connection.send_message( fc::json::to_string(req) ); } + protected: void on_message( const std::string& message ) { diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index 777e76b..5987027 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -73,7 +73,8 @@ namespace fc { namespace http { virtual void send_message( const std::string& message )override { - _ws_connection->send( message ); + auto ec = _ws_connection->send( message ); + FC_ASSERT( !ec, "websocket send failed: ${msg}", ("msg",ec.message() ) ); } virtual void close( int64_t code, const std::string& reason )override { @@ -158,13 +159,15 @@ namespace fc { namespace http { }).wait(); }); _client.set_close_handler( [=]( connection_hdl hdl ){ - _client_thread.async( [&](){ _connection.reset(); } ).wait(); + if( _connection ) + _client_thread.async( [&](){ if( _connection ) _connection->closed(); _connection.reset(); } ).wait(); if( _closed ) _closed->set_value(); }); _client.set_fail_handler( [=]( connection_hdl hdl ){ auto con = _client.get_con_from_hdl(hdl); auto message = con->get_ec().message(); - _client_thread.async( [&](){ _connection.reset(); } ).wait(); + if( _connection ) + _client_thread.async( [&](){ if( _connection ) _connection->closed(); _connection.reset(); } ).wait(); if( _connected && !_connected->ready() ) _connected->set_exception( exception_ptr( new FC_EXCEPTION( exception, "${message}", ("message",message)) ) ); if( _closed ) diff --git a/src/rpc/state.cpp b/src/rpc/state.cpp index b5cb95f..7d4063d 100644 --- a/src/rpc/state.cpp +++ b/src/rpc/state.cpp @@ -35,7 +35,7 @@ void state::handle_reply( const response& response ) await->second->set_value( *response.result ); else if( response.error ) { - await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",*response.error) ) ) ); + await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",response.error->message)("data",response) ) ) ); } else await->second->set_value( fc::variant() ); From 257ac52b91a800be25da890365b22390f6374628 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Wed, 1 Apr 2015 11:30:47 -0400 Subject: [PATCH 08/10] fix build issue --- include/fc/rpc/api_connection.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/fc/rpc/api_connection.hpp b/include/fc/rpc/api_connection.hpp index 32258e7..8b0251b 100644 --- a/include/fc/rpc/api_connection.hpp +++ b/include/fc/rpc/api_connection.hpp @@ -7,6 +7,7 @@ #include #include #include +#include //#include namespace fc { From 0e5474e1525dd56f7285904c1315042e3c4a1fe9 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Wed, 1 Apr 2015 17:06:13 -0400 Subject: [PATCH 09/10] Fix non-void function that didn't return a value --- tests/api.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/api.cpp b/tests/api.cpp index 5f19f5c..533e52b 100644 --- a/tests/api.cpp +++ b/tests/api.cpp @@ -23,7 +23,7 @@ class login_api return *calc; } fc::optional> calc; - std::set test( const std::string&, const std::string& ){}; + std::set test( const std::string&, const std::string& ) { return std::set(); } }; FC_API( login_api, (get_calc)(test) ); From a0192d1081bf47ec6d7fe59a3abd98d0499974a7 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Wed, 1 Apr 2015 17:24:33 -0400 Subject: [PATCH 10/10] unique_ptr to/from variant --- include/fc/variant.hpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/include/fc/variant.hpp b/include/fc/variant.hpp index 9772787..aff502c 100644 --- a/include/fc/variant.hpp +++ b/include/fc/variant.hpp @@ -41,6 +41,8 @@ namespace fc void from_variant( const variant& var, blob& vo ); template void to_variant( const safe& s, variant& v ); template void from_variant( const variant& v, safe& s ); + template void to_variant( const std::unique_ptr& s, variant& v ); + template void from_variant( const variant& v, std::unique_ptr& s ); template void to_variant( const static_variant& s, variant& v ); template void from_variant( const variant& v, static_variant& s ); @@ -502,6 +504,23 @@ namespace fc from_variant( var, *vo ); } } + template + void to_variant( const std::unique_ptr& var, variant& vo ) + { + if( var ) to_variant( *var, vo ); + else vo = nullptr; + } + + template + void from_variant( const variant& var, std::unique_ptr& vo ) + { + if( var.is_null() ) vo.reset(); + else if( vo ) from_variant( var, *vo ); + else { + vo.reset( new T() ); + from_variant( var, *vo ); + } + } template @@ -510,6 +529,7 @@ namespace fc template void from_variant( const variant& v, safe& s ) { s.value = v.as_uint64(); } + variant operator + ( const variant& a, const variant& b ); variant operator - ( const variant& a, const variant& b ); variant operator * ( const variant& a, const variant& b );