From 5dcb2ea992a07c51cadc686e0ea7431961020732 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Mon, 29 Oct 2012 17:57:34 -0400 Subject: [PATCH] updated json rpc to factor out stream connections --- include/fc/json_rpc_client.hpp | 15 +- include/fc/json_rpc_connection.hpp | 72 +++++---- include/fc/json_rpc_stream_connection.hpp | 35 +++++ src/bkup_json.cpp | 174 ---------------------- src/json_rpc_connection.cpp | 160 +++++++++++--------- src/json_rpc_stream_connection.cpp | 45 ++++++ tests/json_rpc_test.cpp | 4 +- 7 files changed, 225 insertions(+), 280 deletions(-) create mode 100644 include/fc/json_rpc_stream_connection.hpp delete mode 100644 src/bkup_json.cpp create mode 100644 src/json_rpc_stream_connection.cpp diff --git a/include/fc/json_rpc_client.hpp b/include/fc/json_rpc_client.hpp index 3db486e..cf3308e 100644 --- a/include/fc/json_rpc_client.hpp +++ b/include/fc/json_rpc_client.hpp @@ -15,8 +15,9 @@ namespace fc { namespace json { template \ static fc::function BOOST_PP_COMMA_IF(n) BOOST_PP_ENUM_PARAMS(n,A) > \ functor( P, R (C::*mem_func)(BOOST_PP_ENUM_PARAMS(n,A)) IS_CONST, \ - const rpc_connection& c = rpc_connection(), const char* name = nullptr ) { \ - return [=](BOOST_PP_ENUM_BINARY_PARAMS(n,A,a))->fc::future{ return c.invoke( name, make_tuple(BOOST_PP_ENUM_PARAMS(n,a)) ); }; \ + const rpc_connection::ptr& c = rpc_connection::ptr(), const char* name = nullptr ) { \ + return [=](BOOST_PP_ENUM_BINARY_PARAMS(n,A,a))->fc::future{ \ + return c->invoke( name, make_tuple(BOOST_PP_ENUM_PARAMS(n,a)) ); }; \ } BOOST_PP_REPEAT( 8, RPC_MEMBER_FUNCTOR, const ) BOOST_PP_REPEAT( 8, RPC_MEMBER_FUNCTOR, BOOST_PP_EMPTY() ) @@ -24,13 +25,13 @@ namespace fc { namespace json { }; struct vtable_visitor { - vtable_visitor( rpc_connection& c ):_con(c){} + vtable_visitor( rpc_connection::ptr& c ):_con(c){} template void operator()( const char* name, Function& memb, MemberPtr m )const { memb = rpc_member::functor( nullptr, m, _con, name ); } - rpc_connection& _con; + rpc_connection::ptr& _con; }; }; @@ -39,12 +40,12 @@ namespace fc { namespace json { class rpc_client : public ptr { public: rpc_client(){} - rpc_client( const rpc_connection& c ):_con(c){ + rpc_client( const rpc_connection::ptr& c ):_con(c){ init(); } rpc_client( const rpc_client& c ):_con(c._con){} - void set_connection( const rpc_connection& c ) { + void set_connection( const rpc_connection::ptr& c ) { _con = c; init(); } @@ -55,7 +56,7 @@ namespace fc { namespace json { this->_vtable.reset(new fc::detail::vtable() ); this->_vtable->template visit( fc::json::detail::vtable_visitor(_con) ); } - rpc_connection _con; + rpc_connection::ptr _con; }; } } // fc::json diff --git a/include/fc/json_rpc_connection.hpp b/include/fc/json_rpc_connection.hpp index 375bcbb..36e8f0b 100644 --- a/include/fc/json_rpc_connection.hpp +++ b/include/fc/json_rpc_connection.hpp @@ -1,6 +1,5 @@ #pragma once #include -#include #include #include @@ -31,56 +30,67 @@ namespace fc { namespace json { protected: ~pending_result_impl(){} }; - } + + struct server_method : public fc::retainable { + typedef fc::shared_ptr ptr; + virtual value call( const value& param ) = 0; + }; + template + struct server_method_impl : public server_method { + server_method_impl( const fc::function& a ):func(a){}; + virtual value call( const value& param ) { + return value( func( value_cast(param) ) ); + } + fc::function func; + }; + + } // namespace detail /** - * This class can be used to communicate via json-rpc over a pair of - * streams. - * - * @note rpc_connection has reference semantics and all 'copies' will - * refer to the same underlying stream. + * This is the base JSON RPC connection that handles the protocol + * level issues. It does not implement a transport which should + * be provided separately and use the handle_message and set_send_delegate + * methods to manage the protocol. */ - class rpc_connection { + class rpc_connection : public fc::retainable { public: rpc_connection(); - /** note the life of i and o must be longer than rpc_connection's life */ - rpc_connection( istream& i, ostream& o ); - rpc_connection( rpc_connection&& c ); - rpc_connection( const rpc_connection& c ); + rpc_connection(const rpc_connection&); + rpc_connection(rpc_connection&&); ~rpc_connection(); + rpc_connection& operator=(const rpc_connection&); + rpc_connection& operator=(rpc_connection&&); - rpc_connection& operator=(rpc_connection&& m); - rpc_connection& operator=(const rpc_connection& m); + typedef fc::shared_ptr ptr; - /** note the life of i and o must be longer than rpc_connection's life */ - void init( istream& i, ostream& o ); - - /* - template - future invoke( const fc::string& method ) { - auto r = new detail::pending_result_impl(); - invoke( detail::pending_result::ptr(r), method, value(make_tuple()) ); - return promise::ptr( r, true ); - } */ + void cancel_pending_requests(); template - future invoke( const fc::string& method, Args&& a = nullptr )const { + future invoke( const fc::string& method, Args&& a = nullptr ){ auto r = new detail::pending_result_impl(); - slog( "%p", r ); typename promise::ptr rtn( r, true ); invoke( detail::pending_result::ptr(r), method, value(fc::forward(a)) ); return rtn; } + template + void add_method( const fc::string& name, const fc::function& a ) { + this->add_method( name, detail::server_method::ptr(new detail::server_method_impl(a) ) ); + } + + protected: + void handle_message( const value& m ); + virtual void send_invoke( uint64_t id, const fc::string& m, value&& param ); + virtual void send_error( uint64_t id, int64_t code, const fc::string& msg ); + virtual void send_result( uint64_t id, value&& r ); + private: - void invoke( detail::pending_result::ptr&& p, const fc::string& m, const value& param )const; + void invoke( detail::pending_result::ptr&& p, const fc::string& m, value&& param ); + void add_method( const fc::string& name, detail::server_method::ptr&& m ); + class impl; fc::shared_ptr my; }; - - - } } // fc::json - diff --git a/include/fc/json_rpc_stream_connection.hpp b/include/fc/json_rpc_stream_connection.hpp new file mode 100644 index 0000000..2ece567 --- /dev/null +++ b/include/fc/json_rpc_stream_connection.hpp @@ -0,0 +1,35 @@ +#pragma once +#include + +namespace fc { + class istream; + class ostream; + + namespace json { + class rpc_stream_connection : public rpc_connection { + public: + typedef fc::shared_ptr ptr; + rpc_stream_connection( fc::istream&, fc::ostream& ); + + // the life of the streams must exceed the life of all copies + // of this rpc_stream_connection + void open( fc::istream&, fc::ostream& ); + + // cancels all pending requests, closes the ostream + // results on_close() being called if the stream is not already closed. + void close(); + + /** + * When the connection is closed, call the given method + */ + void on_close( const fc::function& ); + + protected: + virtual void send_invoke( uint64_t id, const fc::string& m, value&& param ); + virtual void send_error( uint64_t id, int64_t code, const fc::string& msg ); + virtual void send_result( uint64_t id, value&& r ); + private: + class impl; + fc::shared_ptr my; + }; +} } // fc::json diff --git a/src/bkup_json.cpp b/src/bkup_json.cpp deleted file mode 100644 index 89d4384..0000000 --- a/src/bkup_json.cpp +++ /dev/null @@ -1,174 +0,0 @@ -#include -#include -#include - -// TODO: replace sstream with light/fast compiling version -#include - -namespace fc { namespace json { - - class const_visitor : public fc::abstract_const_visitor { - public: - fc::ostream& out; - const_visitor( fc::ostream& o ):out(o){} - - virtual void visit()const{} - virtual void visit( const char& c )const{ out << '"' << c << '"'; } - virtual void visit( const uint8_t& c )const{ out << int(c); } - virtual void visit( const uint16_t& c )const{ out << c; } - virtual void visit( const uint32_t& c )const{ out << c; } - virtual void visit( const uint64_t& c )const{ out << c; } - virtual void visit( const int8_t& c )const{ out << int(c); } - virtual void visit( const int16_t& c )const{ out << c;} - virtual void visit( const int32_t& c )const{ out << c;} - virtual void visit( const int64_t& c )const{ out << c;} - virtual void visit( const double& c )const{ out << c;} - virtual void visit( const float& c )const{ out << c;} - virtual void visit( const bool& c )const{ out << (c?"true":"false"); } - virtual void visit( const fc::string& c )const{ out << '"'<= '0' && c <= '9' ) - return c - '0'; - if( c >= 'a' && c <= 'f' ) - return c - 'a' + 10; - if( c >= 'A' && c <= 'F' ) - return c - 'A' + 10; - return c; - } - - string escape_string( const string& s ) { - // calculate escape string size. - uint32_t ecount = 0; - for( auto i = s.begin(); i != s.end(); ++i ) { - if( ' '<= *i && *i <= '~' && *i !='\\' && *i != '"' ) { - ecount+=1; - } else { - switch( *i ) { - case '\t' : - case '\n' : - case '\r' : - case '\\' : - case '"' : - ecount += 2; break; - default: - ecount += 4; - } - } - } - // unless the size changed, just return it. - if( ecount == s.size() ) { return s; } - - // reserve the bytes - string out; out.reserve(ecount); - - // print it out. - for( auto i = s.begin(); i != s.end(); ++i ) { - if( ' '<= *i && *i <= '~' && *i !='\\' && *i != '"' ) { - out += *i; - } else { - out += '\\'; - switch( *i ) { - case '\t' : out += 't'; break; - case '\n' : out += 'n'; break; - case '\r' : out += 'r'; break; - case '\\' : out += '\\'; break; - case '"' : out += '"'; break; - default: - out += "x"; - const char* const hexdig = "0123456789abcdef"; - out += hexdig[*i >> 4]; - out += hexdig[*i & 0xF]; - } - } - } - return out; - } - string unescape_string( const string& s ) { - string out; out.reserve(s.size()); - for( auto i = s.begin(); i != s.end(); ++i ) { - if( *i != '\\' ) { - if( *i != '"' ) out += *i; - } - else { - ++i; - if( i == out.end() ) return out; - switch( *i ) { - case 't' : out += '\t'; break; - case 'n' : out += '\n'; break; - case 'r' : out += '\r'; break; - case '\\' : out += '\\'; break; - case '"' : out += '"'; break; - case 'x' : { - ++i; if( i == out.end() ) return out; - char c = from_hex(*i); - ++i; if( i == out.end() ) { out += c; return out; } - c = c<<4 | from_hex(*i); - out += c; - break; - } - default: - out += '\\'; - out += *i; - } - } - } - return out; - } - - - - -} } diff --git a/src/json_rpc_connection.cpp b/src/json_rpc_connection.cpp index 22a5c0f..bab15c5 100644 --- a/src/json_rpc_connection.cpp +++ b/src/json_rpc_connection.cpp @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include namespace fc { namespace json { @@ -18,65 +21,98 @@ namespace fc { namespace json { class rpc_connection::impl : public fc::retainable { public: - impl() - :_in(0),_out(0),_next_req_id(0){ } - ~impl(){ } - istream* _in; - ostream* _out; - int64_t _next_req_id; + impl():_next_req_id(0){ } + ~impl(){ cancel_pending_requests(); } + int64_t _next_req_id; + std::unordered_map _methods; + detail::pending_result::ptr _pr_head; detail::pending_result::ptr _pr_tail; - fc::future _read_loop_complete; - void read_loop() { - fc::string line; - fc::getline( *_in, line ); - while( !_in->eof() ) { - try { - fc::value v= fc::json::from_string( line ); - - auto id_itr = v.find( "id" ); - auto result_itr = v.find( "result" ); - if( id_itr != v.end() && result_itr != v.end() ) { - int id = value_cast(id_itr->val); - auto cur = _pr_head; - decltype(cur) prev; - while( cur ) { - if( cur->id == id ) { - if( prev ) prev->next = cur->next; - else _pr_head = cur->next; - if( !cur->next ) _pr_tail = cur->next; - - cur->handle_result( result_itr->val ); - break; - } - cur = cur->next; - } - } - } catch (...) { - wlog( "%s", fc::except_str().c_str() ); - } - fc::getline( *_in, line ); - } - slog( "Exit Read Loop, canceling waiting tasks!" ); - - auto cur = _pr_head; - while( cur ) { - cur->handle_error( "Connection Closed" ); - cur = cur->next; - } - - _pr_head.reset(); - _pr_tail.reset(); + void cancel_pending_requests() { + auto cur = _pr_head; + while( cur ) { + cur->set_exception( fc::copy_exception( fc::generic_exception("canceled") ) ); + cur = cur->next; + } + _pr_head.reset(); + _pr_tail.reset(); } }; + + + void rpc_connection::handle_message( const value& v ) { + auto id_itr = v.find( "id" ); + auto m_itr = v.find( "method" ); + auto end = v.end(); + + if( m_itr != end ) { + fc::string mname = value_cast(m_itr->val); + auto id = value_cast(id_itr->val); + auto smeth = my->_methods.find( mname ); + if( smeth == my->_methods.end() ) { + if( id_itr != end ) { + // TODO: send invalid method reply + send_error( id, -1, "Unknown method '"+mname+"'"); + } + // nothing to do, unknown method + } else { // known method, attempt to call it and send reply; + auto p_itr = v.find( "params" ); + + value nul; + const value& params = (p_itr != end) ? p_itr->val : nul; + + if( id_itr != end ) { // capture reply + try { + send_result( id, smeth->second->call(params) ); + } catch ( ... ) { + send_error( id, -1, fc::except_str() ); + } + } else { // ignore exception + result + try { smeth->second->call( params ); }catch(...){} + } + } + return; + } else if( id_itr != end ) { // we id but no method, therefore potential reply + int id = value_cast(id_itr->val); + auto cur = my->_pr_head; + decltype(cur) prev; + while( cur ) { + if( cur->id == id ) { + if( prev ) prev->next = cur->next; + else my->_pr_head = cur->next; + if( !cur->next ) my->_pr_tail = cur->next; + + try { + auto r_itr = v.find( "result" ); + if( r_itr != end ) { + cur->handle_result( r_itr->val ); + } else { + auto e_itr = v.find( "error" ); + if( e_itr != end ) { + cur->set_exception( + fc::copy_exception( + fc::generic_exception( + value_cast( + e_itr->val["message"] ) ) ) ); + } + } + } catch( ... ) { + cur->set_exception( fc::current_exception() ); + } + return; + } + cur = cur->next; + } + FC_THROW_MSG( "Unexpected reply with id %s", id ); + } + FC_THROW_MSG( "Method with no 'id' or 'method' field" ); + } + + rpc_connection::rpc_connection() :my( new impl() ){ } - rpc_connection::rpc_connection( istream& i, ostream& o ) - :my( new impl() ) - { - init( i, o ); - } + rpc_connection::rpc_connection( const rpc_connection& c ) :my(c.my){ } rpc_connection::rpc_connection( rpc_connection&& c ) { @@ -94,16 +130,12 @@ namespace fc { namespace json { return *this; } - void rpc_connection::init( istream& i, ostream& o ) { - my->_in = &i; - my->_out = &o; - my->_read_loop_complete = fc::async( [=](){ my->read_loop(); } ); + void rpc_connection::cancel_pending_requests() { + my->cancel_pending_requests(); } - void rpc_connection::invoke( detail::pending_result::ptr&& p, - const fc::string& m, const value& v )const { - - p->id = ++my->_next_req_id; + void rpc_connection::invoke( detail::pending_result::ptr&& p, + const fc::string& m, value&& param ) { if( my->_pr_tail ) { my->_pr_tail->next = p; my->_pr_tail = my->_pr_tail->next; @@ -111,13 +143,7 @@ namespace fc { namespace json { my->_pr_tail = p; my->_pr_head = my->_pr_tail; } - - ostream& out = *my->_out; - out << "{\"id\":"<id<<",\"method\":\""<_next_req_id, m, fc::move(param) ); } diff --git a/src/json_rpc_stream_connection.cpp b/src/json_rpc_stream_connection.cpp new file mode 100644 index 0000000..e8d58de --- /dev/null +++ b/src/json_rpc_stream_connection.cpp @@ -0,0 +1,45 @@ + + /** note the life of i and o must be longer than rpc_connection's life */ + rpc_connection( istream& i, ostream& o ); + + /** note the life of i and o must be longer than rpc_connection's life */ + void init( istream& i, ostream& o ); + + istream* _in; + ostream* _out; + + fc::future _read_loop_complete; + void read_loop() { + fc::string line; + fc::getline( *_in, line ); + while( !_in->eof() ) { + try { + fc::value v= fc::json::from_string( line ); + + } catch (...) { + wlog( "%s", fc::except_str().c_str() ); + } + fc::getline( *_in, line ); + } + slog( "Exit Read Loop, canceling waiting tasks!" ); + + auto cur = _pr_head; + while( cur ) { + cur->handle_error( "Connection Closed" ); + cur = cur->next; + } + + _pr_head.reset(); + _pr_tail.reset(); + } + + rpc_connection::rpc_connection( istream& i, ostream& o ) + :my( new impl() ) + { + init( i, o ); + } + void rpc_connection::init( istream& i, ostream& o ) { + my->_in = &i; + my->_out = &o; + my->_read_loop_complete = fc::async( [=](){ my->read_loop(); } ); + } diff --git a/tests/json_rpc_test.cpp b/tests/json_rpc_test.cpp index 91a04c0..911abb0 100644 --- a/tests/json_rpc_test.cpp +++ b/tests/json_rpc_test.cpp @@ -1,4 +1,6 @@ +#include #include +#include struct test { int add(int x){ return x+1; } @@ -18,7 +20,7 @@ FC_STUB( test, (add)(sub)(sub1)(sub2)(sub3)(sub4)(sub5)(sub6)(sub7)(sub8)(sub9) int main( int argc, char** argv ) { try { - fc::json::rpc_connection con(fc::cin,fc::cout); + fc::json::rpc_connection::ptr con( new fc::json::rpc_stream_connection( fc::cin, fc::cout ) ); fc::json::rpc_client c( con ); slog( "%d", c->add( 5 ).wait() );