diff --git a/include/fc/api.hpp b/include/fc/api.hpp index cf42287..e84f50b 100644 --- a/include/fc/api.hpp +++ b/include/fc/api.hpp @@ -61,6 +61,7 @@ namespace fc { friend bool operator == ( const api& a, const api& b ) { return a._data == b._data && a._vtable == b._vtable; } friend bool operator != ( const api& a, const api& b ) { return !(a._data == b._data && a._vtable == b._vtable); } + uint64_t get_handle()const { return uint64_t(_data.get()); } vtable_type& operator*()const { FC_ASSERT( _vtable ); return *_vtable; } vtable_type* operator->()const { FC_ASSERT( _vtable ); return _vtable.get(); } diff --git a/include/fc/io/json.hpp b/include/fc/io/json.hpp index 299378f..4d97ac0 100644 --- a/include/fc/io/json.hpp +++ b/include/fc/io/json.hpp @@ -31,6 +31,7 @@ namespace fc static variant from_stream( buffered_istream& in, parse_type ptype = legacy_parser ); static variant from_string( const string& utf8_str, parse_type ptype = legacy_parser ); + static variants variants_from_string( const string& utf8_str, parse_type ptype = legacy_parser ); static string to_string( const variant& v ); static string to_pretty_string( const variant& v ); diff --git a/include/fc/network/http/websocket.hpp b/include/fc/network/http/websocket.hpp index 5d0e554..5c253aa 100644 --- a/include/fc/network/http/websocket.hpp +++ b/include/fc/network/http/websocket.hpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include namespace fc { namespace http { namespace detail { @@ -22,6 +24,8 @@ namespace fc { namespace http { void set_session_data( fc::any d ){ _session_data = std::move(d); } fc::any& get_session_data() { return _session_data; } + + fc::signal closed; private: fc::any _session_data; std::function _on_message; @@ -38,6 +42,7 @@ namespace fc { namespace http { void on_connection( const on_connection_handler& handler); void listen( uint16_t port ); + void listen( const fc::ip::endpoint& ep ); void start_accept(); private: diff --git a/include/fc/reflect/variant.hpp b/include/fc/reflect/variant.hpp index bc14fc4..f48fe9a 100644 --- a/include/fc/reflect/variant.hpp +++ b/include/fc/reflect/variant.hpp @@ -20,9 +20,20 @@ namespace fc template void operator()( const char* name )const { - vo(name,(val.*member)); + this->add(vo,name,(val.*member)); } + private: + template + void add( mutable_variant_object& vo, const char* name, const optional& v )const + { + if( v.valid() ) + vo(name,*v); + } + template + void add( mutable_variant_object& vo, const char* name, const M& v )const + { vo(name,v); } + mutable_variant_object& vo; const T& val; }; diff --git a/include/fc/rpc/api_connection.hpp b/include/fc/rpc/api_connection.hpp index a4781c9..8b0251b 100644 --- a/include/fc/rpc/api_connection.hpp +++ b/include/fc/rpc/api_connection.hpp @@ -7,6 +7,7 @@ #include #include #include +#include //#include namespace fc { @@ -15,93 +16,154 @@ namespace fc { typedef uint32_t api_id_type; namespace detail { - class generic_api + template + class callback_functor { public: - template - generic_api( const Api& a, const std::shared_ptr& c ); + typedef typename std::function::result_type result_type; - generic_api( const generic_api& cpy ) = delete; + callback_functor( fc::api_connection& con, uint64_t id ) + :_callback_id(id),_api_connection(con){} - variant call( const string& name, const variants& args ) - { - auto itr = _by_name.find(name); - FC_ASSERT( itr != _by_name.end() ); - return call( itr->second, args ); - } - - variant call( uint32_t method_id, const variants& args ) - { - FC_ASSERT( method_id < _methods.size() ); - return _methods[method_id](args); - } + template + result_type operator()( Args... args )const; private: - friend struct api_visitor; + uint64_t _callback_id; + fc::api_connection& _api_connection; + }; - struct api_visitor - { - api_visitor( generic_api& a, const std::shared_ptr& s ):api(a),_api_con(s){ } + template + std::function bind_first_arg( const std::function& f, Arg0 a0 ) + { + return [=]( Args... args ) { return f( a0, args... ); }; + } + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + return f(); + } - template - std::function bind_first_arg( const std::function& f, Arg0 a0 )const - { - return [=]( Args... args ) { return f( a0, args... ); }; - } - template - R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const - { - return f(); - } + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + return call_generic( bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); + } - template - R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const - { - FC_ASSERT( a0 != e ); - return call_generic( bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); - } + template + std::function to_generic( const std::function& f ) + { + return [=]( const variants& args ) { + return variant( call_generic( f, args.begin(), args.end() ) ); + }; + } - template - std::function to_generic( const std::function(Args...)>& f )const; + template + std::function to_generic( const std::function& f ) + { + return [=]( const variants& args ) { + call_generic( f, args.begin(), args.end() ); + return variant(); + }; + } - template - std::function to_generic( const std::function>(Args...)>& f )const; - - template - std::function to_generic( const std::function& f )const - { - return [=]( const variants& args ) { - return variant( call_generic( f, args.begin(), args.end() ) ); - }; - } - - template - std::function to_generic( const std::function& f )const - { - return [=]( const variants& args ) { - call_generic( f, args.begin(), args.end() ); - return variant(); - }; - } - - template - void operator()( const char* name, std::function& memb )const { - api._methods.emplace_back( to_generic( memb ) ); - api._by_name[name] = api._methods.size() - 1; - } - - generic_api& api; - const std::shared_ptr& _api_con; - }; - - - std::shared_ptr _api_connection; - fc::any _api; - std::map< std::string, uint32_t > _by_name; - std::vector< std::function > _methods; - }; // class generic_api } // namespace detail + class generic_api + { + public: + template + generic_api( const Api& a, const std::shared_ptr& c ); + + generic_api( const generic_api& cpy ) = delete; + + variant call( const string& name, const variants& args ) + { + auto itr = _by_name.find(name); + FC_ASSERT( itr != _by_name.end(), "no method with name '${name}'", ("name",name)("api",_by_name) ); + return call( itr->second, args ); + } + + variant call( uint32_t method_id, const variants& args ) + { + FC_ASSERT( method_id < _methods.size() ); + return _methods[method_id](args); + } + + fc::api_connection& get_connection(){ return *_api_connection; } + + + private: + friend struct api_visitor; + + template + std::function bind_first_arg( const std::function& f, Arg0 a0 )const + { + return [=]( Args... args ) { return f( a0, args... ); }; + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e )const + { + return f(); + } + + template + R call_generic( const std::function,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + detail::callback_functor arg0( *this, a0->as() ); + return call_generic( this->bind_first_arg,Args...>( f, std::function(arg0) ), a0+1, e ); + } + template + R call_generic( const std::function&,Args...)>& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + detail::callback_functor arg0( get_connection(), a0->as() ); + return call_generic( this->bind_first_arg&,Args...>( f, arg0 ), a0+1, e ); + } + + template + R call_generic( const std::function& f, variants::const_iterator a0, variants::const_iterator e ) + { + FC_ASSERT( a0 != e ); + return call_generic( this->bind_first_arg( f, a0->as< typename std::decay::type >() ), a0+1, e ); + } + + struct api_visitor + { + api_visitor( generic_api& a, const std::shared_ptr& s ):api(a),_api_con(s){ } + + template + std::function to_generic( const std::function(Args...)>& f )const; + + template + std::function to_generic( const std::function>(Args...)>& f )const; + + template + std::function to_generic( const std::function& f )const; + + template + std::function to_generic( const std::function& f )const; + + template + void operator()( const char* name, std::function& memb )const { + api._methods.emplace_back( to_generic( memb ) ); + api._by_name[name] = api._methods.size() - 1; + } + + generic_api& api; + const std::shared_ptr& _api_con; + }; + + + std::shared_ptr _api_connection; + fc::any _api; + std::map< std::string, uint32_t > _by_name; + std::vector< std::function > _methods; + }; // class generic_api + class api_connection : public std::enable_shared_from_this @@ -120,24 +182,50 @@ namespace fc { } /** makes calls to the remote server */ - virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) = 0; + virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) = 0; + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) = 0; + virtual void send_notice( uint64_t callback_id, variants args = variants() ) = 0; variant receive_call( api_id_type api_id, const string& method_name, const variants& args = variants() )const { - //wdump( (api_id)(method_name)(args) ); FC_ASSERT( _local_apis.size() > api_id ); return _local_apis[api_id]->call( method_name, args ); } + variant receive_callback( uint64_t callback_id, const variants& args = variants() )const + { + FC_ASSERT( _local_callbacks.size() > callback_id ); + return _local_callbacks[callback_id]( args ); + } + void receive_notice( uint64_t callback_id, const variants& args = variants() )const + { + FC_ASSERT( _local_callbacks.size() > callback_id ); + _local_callbacks[callback_id]( args ); + } template api_id_type register_api( const Interface& a ) { - _local_apis.push_back( std::unique_ptr( new detail::generic_api(a, shared_from_this() ) ) ); + auto handle = a.get_handle(); + auto itr = _handle_to_id.find(handle); + if( itr != _handle_to_id.end() ) return itr->second; + + _local_apis.push_back( std::unique_ptr( new generic_api(a, shared_from_this() ) ) ); + _handle_to_id[handle] = _local_apis.size() - 1; return _local_apis.size() - 1; } + template + uint64_t register_callback( const std::function& cb ) + { + _local_callbacks.push_back( detail::to_generic( cb ) ); + return _local_callbacks.size() - 1; + } + + fc::signal closed; private: - std::vector< std::unique_ptr > _local_apis; + std::vector< std::unique_ptr > _local_apis; + std::map< uint64_t, api_id_type > _handle_to_id; + std::vector< std::function > _local_callbacks; struct api_visitor @@ -160,13 +248,24 @@ namespace fc { template static fc::api from_variant( const variant& v, - fc::api* /*used for template deduction*/, - const std::shared_ptr& con - ) + fc::api* /*used for template deduction*/, + const std::shared_ptr& con + ) { return con->get_remote_api( v.as_uint64() ); } + template + static fc::variant convert_callbacks( const std::shared_ptr&, const T& v ) + { + return fc::variant(v); + } + + template + static fc::variant convert_callbacks( const std::shared_ptr& con, const std::function& v ) + { + return con->register_callback( v ); + } template void operator()( const char* name, std::function& memb )const @@ -174,10 +273,19 @@ namespace fc { auto con = _connection; auto api_id = _api_id; memb = [con,api_id,name]( Args... args ) { - auto var_result = con->send_call( api_id, name, {args...} ); + auto var_result = con->send_call( api_id, name, { convert_callbacks(con,args)...} ); return from_variant( var_result, (Result*)nullptr, con ); }; } + template + void operator()( const char* name, std::function& memb )const + { + auto con = _connection; + auto api_id = _api_id; + memb = [con,api_id,name]( Args... args ) { + con->send_call( api_id, name, { convert_callbacks(con,args)...} ); + }; + } }; }; @@ -185,11 +293,22 @@ namespace fc { { public: /** makes calls to the remote server */ - virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) override + virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) override { FC_ASSERT( _remote_connection ); - return _remote_connection->receive_call( api_id, method_name, args ); + return _remote_connection->receive_call( api_id, method_name, std::move(args) ); } + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override + { + FC_ASSERT( _remote_connection ); + return _remote_connection->receive_callback( callback_id, args ); + } + virtual void send_notice( uint64_t callback_id, variants args = variants() ) override + { + FC_ASSERT( _remote_connection ); + _remote_connection->receive_notice( callback_id, args ); + } + void set_remote_connection( const std::shared_ptr& rc ) { @@ -203,67 +322,82 @@ namespace fc { }; template - detail::generic_api::generic_api( const Api& a, const std::shared_ptr& c ) + generic_api::generic_api( const Api& a, const std::shared_ptr& c ) :_api_connection(c),_api(a) { boost::any_cast(a)->visit( api_visitor( *this, _api_connection ) ); } template - std::function detail::generic_api::api_visitor::to_generic( + std::function generic_api::api_visitor::to_generic( const std::function(Args...)>& f )const { auto api_con = _api_con; + auto gapi = &api; return [=]( const variants& args ) { - auto api_result = call_generic( f, args.begin(), args.end() ); + auto api_result = gapi->call_generic( f, args.begin(), args.end() ); return api_con->register_api( api_result ); }; } template - std::function detail::generic_api::api_visitor::to_generic( + std::function generic_api::api_visitor::to_generic( const std::function>(Args...)>& f )const { auto api_con = _api_con; + auto gapi = &api; return [=]( const variants& args )-> fc::variant { - auto api_result = call_generic( f, args.begin(), args.end() ); + auto api_result = gapi->call_generic( f, args.begin(), args.end() ); if( api_result ) return api_con->register_api( *api_result ); return variant(); }; } - - /* - class json_api_connection : public api_connection + template + std::function generic_api::api_visitor::to_generic( const std::function& f )const { - public: - json_api_connection(){}; - void set_json_connection( const std::shared_ptr& json_con ) - { - json_con->add_method( "call", [this]( const variants& args ) -> variant { - FC_ASSERT( args.size() == 3 && args[2].is_array() ); - return this->receive_call( args[0].as_uint64(), - args[1].as_string(), - args[2].get_array() ); + generic_api* gapi = &api; + return [f,gapi]( const variants& args ) { + return variant( gapi->call_generic( f, args.begin(), args.end() ) ); + }; + } - }); - } + template + std::function generic_api::api_visitor::to_generic( const std::function& f )const + { + generic_api* gapi = &api; + return [f,gapi]( const variants& args ) { + gapi->call_generic( f, args.begin(), args.end() ); + return variant(); + }; + } - json_api_connection( const std::shared_ptr& json_con ) - { - set_json_connection( json_con ); - } + namespace detail { + template + template + typename callback_functor::result_type callback_functor::operator()( Args... args )const + { + _api_connection.send_callback( _callback_id, fc::variants{ args... } ).template as< result_type >(); + } - virtual variant send_call( api_id_type api_id, - const string& method_name, - const variants& args = variants() )const override - { - return _json_con->async_call( "call", {api_id, method_name, args} ).wait(); - } - private: - std::shared_ptr _json_con; + template + class callback_functor + { + public: + typedef void result_type; - }; - */ + callback_functor( fc::api_connection& con, uint64_t id ) + :_callback_id(id),_api_connection(con){} + + void operator()( Args... args )const + { + _api_connection.send_notice( _callback_id, fc::variants{ args... } ); + } + + private: + uint64_t _callback_id; + fc::api_connection& _api_connection; + }; + } // namespace detail } // fc diff --git a/include/fc/rpc/cli.hpp b/include/fc/rpc/cli.hpp new file mode 100644 index 0000000..a25e424 --- /dev/null +++ b/include/fc/rpc/cli.hpp @@ -0,0 +1,85 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace fc { namespace rpc { + + /** + * Provides a simple wrapper for RPC calls to a given interface. + */ + class cli : public api_connection + { + public: + ~cli() + { + if( _run_complete.valid() ) + { + stop(); + } + } + virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() ) + { + FC_ASSERT(false); + } + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) + { + FC_ASSERT(false); + } + virtual void send_notice( uint64_t callback_id, variants args = variants() ) + { + FC_ASSERT(false); + } + + void start() + { + _run_complete = fc::async( [&](){ run(); } ); + } + void stop() + { + _run_complete.cancel(); + _run_complete.wait(); + } + void wait(){ _run_complete.wait(); } + void format_result( const string& method, std::function formatter) + { + _result_formatters[method] = formatter; + } + private: + void run() + { + while( !fc::cin.eof() && !_run_complete.canceled() ) + { + try { + std::cout << ">>> "; + std::cout.flush(); + std::string line; + fc::getline( fc::cin, line ); + std::cout << line << "\n"; + line += char(EOF); + fc::variants args = fc::json::variants_from_string(line);; + if( args.size() == 0 ) continue; + + const string& method = args[0].get_string(); + + auto result = receive_call( 0, method, variants( args.begin()+1,args.end() ) ); + auto itr = _result_formatters.find( method ); + if( itr == _result_formatters.end() ) + { + std::cout << fc::json::to_pretty_string( result ) << "\n"; + } + else + std::cout << itr->second( result, args ) << "\n"; + } + catch ( const fc::exception& e ) + { + std::cout << e.to_detail_string() << "\n"; + } + } + } + std::map > _result_formatters; + fc::future _run_complete; + }; +} } diff --git a/include/fc/rpc/state.hpp b/include/fc/rpc/state.hpp index f870750..3c36bcf 100644 --- a/include/fc/rpc/state.hpp +++ b/include/fc/rpc/state.hpp @@ -45,10 +45,13 @@ namespace fc { namespace rpc { void close(); + void on_unhandled( const std::function& unhandled ); + private: uint64_t _next_id = 1; std::unordered_map::ptr> _awaiting; std::unordered_map _methods; + std::function _unhandled; }; } } // namespace fc::rpc diff --git a/include/fc/rpc/websocket_api.hpp b/include/fc/rpc/websocket_api.hpp index aa2f3f6..202a2cb 100644 --- a/include/fc/rpc/websocket_api.hpp +++ b/include/fc/rpc/websocket_api.hpp @@ -10,7 +10,7 @@ namespace fc { namespace rpc { class websocket_api_connection : public api_connection { public: - websocket_api_connection( fc::http::websocket_connection_ptr c ) + websocket_api_connection( fc::http::websocket_connection& c ) :_connection(c) { _rpc_state.add_method( "call", [this]( const variants& args ) -> variant { @@ -19,49 +19,83 @@ namespace fc { namespace rpc { args[1].as_string(), args[2].get_array() ); }); - _connection->on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); + + _rpc_state.add_method( "notice", [this]( const variants& args ) -> variant { + FC_ASSERT( args.size() == 2 && args[1].is_array() ); + this->receive_notice( args[0].as_uint64(), args[1].get_array() ); + return variant(); + }); + + _rpc_state.add_method( "callback", [this]( const variants& args ) -> variant { + FC_ASSERT( args.size() == 2 && args[1].is_array() ); + this->receive_callback( args[0].as_uint64(), args[1].get_array() ); + return variant(); + }); + + _rpc_state.on_unhandled( [&]( const std::string& method_name, const variants& args ){ + return this->receive_call( 0, method_name, args ); + }); + + _connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg); } ); + _connection.closed.connect( [this](){ closed(); } ); } virtual variant send_call( api_id_type api_id, - const string& method_name, - const variants& args = variants() ) override + string method_name, + variants args = variants() ) override { - auto request = _rpc_state.start_remote_call( "call", {api_id, method_name, args} ); - _connection->send_message( fc::json::to_string(request) ); + auto request = _rpc_state.start_remote_call( "call", {api_id, std::move(method_name), std::move(args) } ); + _connection.send_message( fc::json::to_string(request) ); return _rpc_state.wait_for_response( *request.id ); } + virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override + { + auto request = _rpc_state.start_remote_call( "callback", {callback_id, std::move(args) } ); + _connection.send_message( fc::json::to_string(request) ); + return _rpc_state.wait_for_response( *request.id ); + } + virtual void send_notice( uint64_t callback_id, variants args = variants() ) override + { + fc::rpc::request req{ optional(), "notice", {callback_id, std::move(args)}}; + _connection.send_message( fc::json::to_string(req) ); + } + protected: void on_message( const std::string& message ) - { - auto var = fc::json::from_string(message); - const auto& var_obj = var.get_object(); - if( var_obj.contains( "method" ) ) - { - auto call = var.as(); - try { - auto result = _rpc_state.local_call( call.method, call.params ); - if( call.id ) - { - _connection->send_message( fc::json::to_string( response( *call.id, result ) ) ); - } - } - catch ( const fc::exception& e ) + { + try { + auto var = fc::json::from_string(message); + const auto& var_obj = var.get_object(); + if( var_obj.contains( "method" ) ) { - if( call.id ) + auto call = var.as(); + try { + auto result = _rpc_state.local_call( call.method, call.params ); + if( call.id ) + { + _connection.send_message( fc::json::to_string( response( *call.id, result ) ) ); + } + } + catch ( const fc::exception& e ) { - _connection->send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); + if( call.id ) + { + _connection.send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) ); + } } } - } - else - { - auto reply = var.as(); - _rpc_state.handle_reply( reply ); + else + { + auto reply = var.as(); + _rpc_state.handle_reply( reply ); + } + } catch ( const fc::exception& e ) { + wdump((e.to_detail_string())); } } - fc::http::websocket_connection_ptr _connection; - fc::rpc::state _rpc_state; + fc::http::websocket_connection& _connection; + fc::rpc::state _rpc_state; }; } } // namespace fc::rpc diff --git a/include/fc/variant.hpp b/include/fc/variant.hpp index 9772787..aff502c 100644 --- a/include/fc/variant.hpp +++ b/include/fc/variant.hpp @@ -41,6 +41,8 @@ namespace fc void from_variant( const variant& var, blob& vo ); template void to_variant( const safe& s, variant& v ); template void from_variant( const variant& v, safe& s ); + template void to_variant( const std::unique_ptr& s, variant& v ); + template void from_variant( const variant& v, std::unique_ptr& s ); template void to_variant( const static_variant& s, variant& v ); template void from_variant( const variant& v, static_variant& s ); @@ -502,6 +504,23 @@ namespace fc from_variant( var, *vo ); } } + template + void to_variant( const std::unique_ptr& var, variant& vo ) + { + if( var ) to_variant( *var, vo ); + else vo = nullptr; + } + + template + void from_variant( const variant& var, std::unique_ptr& vo ) + { + if( var.is_null() ) vo.reset(); + else if( vo ) from_variant( var, *vo ); + else { + vo.reset( new T() ); + from_variant( var, *vo ); + } + } template @@ -510,6 +529,7 @@ namespace fc template void from_variant( const variant& v, safe& s ) { s.value = v.as_uint64(); } + variant operator + ( const variant& a, const variant& b ); variant operator - ( const variant& a, const variant& b ); variant operator * ( const variant& a, const variant& b ); diff --git a/src/io/json.cpp b/src/io/json.cpp index 022df59..e453722 100644 --- a/src/io/json.cpp +++ b/src/io/json.cpp @@ -426,6 +426,7 @@ namespace fc return token_from_stream( in ); case 0x04: // ^D end of transmission case EOF: + case 0: FC_THROW_EXCEPTION( eof_exception, "unexpected end of file" ); default: FC_THROW_EXCEPTION( parse_error_exception, "Unexpected char '${c}' in \"${s}\"", @@ -454,6 +455,20 @@ namespace fc } } FC_RETHROW_EXCEPTIONS( warn, "", ("str",utf8_str) ) } + variants json::variants_from_string( const std::string& utf8_str, parse_type ptype ) + { try { + variants result; + fc::stringstream in( utf8_str ); + //in.exceptions( std::ifstream::eofbit ); + try { + while( true ) + { + // result.push_back( variant_from_stream( in )); + result.push_back(json_relaxed::variant_from_stream( in )); + } + } catch ( const fc::eof_exception& ){} + return result; + } FC_RETHROW_EXCEPTIONS( warn, "", ("str",utf8_str) ) } /* void toUTF8( const char str, ostream& os ) { diff --git a/src/network/http/websocket.cpp b/src/network/http/websocket.cpp index 740f603..5987027 100644 --- a/src/network/http/websocket.cpp +++ b/src/network/http/websocket.cpp @@ -73,7 +73,8 @@ namespace fc { namespace http { virtual void send_message( const std::string& message )override { - _ws_connection->send( message ); + auto ec = _ws_connection->send( message ); + FC_ASSERT( !ec, "websocket send failed: ${msg}", ("msg",ec.message() ) ); } virtual void close( int64_t code, const std::string& reason )override { @@ -158,13 +159,15 @@ namespace fc { namespace http { }).wait(); }); _client.set_close_handler( [=]( connection_hdl hdl ){ - _client_thread.async( [&](){ _connection.reset(); } ).wait(); + if( _connection ) + _client_thread.async( [&](){ if( _connection ) _connection->closed(); _connection.reset(); } ).wait(); if( _closed ) _closed->set_value(); }); _client.set_fail_handler( [=]( connection_hdl hdl ){ auto con = _client.get_con_from_hdl(hdl); auto message = con->get_ec().message(); - _client_thread.async( [&](){ _connection.reset(); } ).wait(); + if( _connection ) + _client_thread.async( [&](){ if( _connection ) _connection->closed(); _connection.reset(); } ).wait(); if( _connected && !_connected->ready() ) _connected->set_exception( exception_ptr( new FC_EXCEPTION( exception, "${message}", ("message",message)) ) ); if( _closed ) @@ -200,6 +203,10 @@ namespace fc { namespace http { { my->_server.listen(port); } + void websocket_server::listen( const fc::ip::endpoint& ep ) + { + my->_server.listen( boost::asio::ip::tcp::endpoint( boost::asio::ip::address_v4(uint32_t(ep.get_address())),ep.port()) ); + } void websocket_server::start_accept() { my->_server.start_accept(); diff --git a/src/rpc/state.cpp b/src/rpc/state.cpp index 4a67faa..7d4063d 100644 --- a/src/rpc/state.cpp +++ b/src/rpc/state.cpp @@ -21,6 +21,8 @@ void state::remove_method( const fc::string& name ) variant state::local_call( const string& method_name, const variants& args ) { auto method_itr = _methods.find(method_name); + if( method_itr == _methods.end() && _unhandled ) + return _unhandled( method_name, args ); FC_ASSERT( method_itr != _methods.end(), "Unknown Method: ${name}", ("name",method_name) ); return method_itr->second(args); } @@ -31,11 +33,12 @@ void state::handle_reply( const response& response ) FC_ASSERT( await != _awaiting.end(), "Unknown Response ID: ${id}", ("id",response.id)("response",response) ); if( response.result ) await->second->set_value( *response.result ); - else + else if( response.error ) { - FC_ASSERT( response.error ); - await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",*response.error) ) ) ); + await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",response.error->message)("data",response) ) ) ); } + else + await->second->set_value( fc::variant() ); _awaiting.erase(await); } @@ -57,5 +60,9 @@ void state::close() item.second->set_exception( fc::exception_ptr(new FC_EXCEPTION( eof_exception, "connection closed" )) ); _awaiting.clear(); } +void state::on_unhandled( const std::function& unhandled ) +{ + _unhandled = unhandled; +} } } // namespace fc::rpc diff --git a/tests/api.cpp b/tests/api.cpp index 440479c..533e52b 100644 --- a/tests/api.cpp +++ b/tests/api.cpp @@ -8,9 +8,10 @@ class calculator public: int32_t add( int32_t a, int32_t b ); // not implemented int32_t sub( int32_t a, int32_t b ); // not implemented + void on_result( const std::function& cb ); }; -FC_API( calculator, (add)(sub) ) +FC_API( calculator, (add)(sub)(on_result) ) class login_api @@ -22,7 +23,7 @@ class login_api return *calc; } fc::optional> calc; - std::set test( const std::string&, const std::string& ){}; + std::set test( const std::string&, const std::string& ) { return std::set(); } }; FC_API( login_api, (get_calc)(test) ); @@ -31,14 +32,18 @@ using namespace fc; class some_calculator { public: - int32_t add( int32_t a, int32_t b ) { return a+b; } - int32_t sub( int32_t a, int32_t b ) { return a-b; } + int32_t add( int32_t a, int32_t b ) { wlog("."); if( _cb ) _cb(a+b); return a+b; } + int32_t sub( int32_t a, int32_t b ) { wlog(".");if( _cb ) _cb(a-b); return a-b; } + void on_result( const std::function& cb ) { wlog( "set callback" ); _cb = cb; return ; } + std::function _cb; }; class variant_calculator { public: double add( fc::variant a, fc::variant b ) { return a.as_double()+b.as_double(); } double sub( fc::variant a, fc::variant b ) { return a.as_double()-b.as_double(); } + void on_result( const std::function& cb ) { wlog("set callback"); _cb = cb; return ; } + std::function _cb; }; using namespace fc::http; @@ -46,12 +51,12 @@ using namespace fc::rpc; int main( int argc, char** argv ) { - { + try { fc::api calc_api( std::make_shared() ); fc::http::websocket_server server; server.on_connection([&]( const websocket_connection_ptr& c ){ - auto wsc = std::make_shared(c); + auto wsc = std::make_shared(*c); auto login = std::make_shared(); login->calc = calc_api; wsc->register_api(fc::api(login)); @@ -66,9 +71,10 @@ int main( int argc, char** argv ) try { fc::http::websocket_client client; auto con = client.connect( "ws://localhost:8090" ); - auto apic = std::make_shared(con); + auto apic = std::make_shared(*con); auto remote_login_api = apic->get_remote_api(); auto remote_calc = remote_login_api->get_calc(); + remote_calc->on_result( []( uint32_t r ) { elog( "callback result ${r}", ("r",r) ); } ); wdump((remote_calc->add( 4, 5 ))); } catch ( const fc::exception& e ) { @@ -76,6 +82,10 @@ int main( int argc, char** argv ) } } wlog( "exit scope" ); + } + catch( const fc::exception& e ) + { + edump((e.to_detail_string())); } wlog( "returning now..." );