diff --git a/include/fc/json_rpc_client.hpp b/include/fc/json_rpc_client.hpp new file mode 100644 index 0000000..fb59fb9 --- /dev/null +++ b/include/fc/json_rpc_client.hpp @@ -0,0 +1,58 @@ +#pragma once +#include +#include + + +namespace fc { namespace json { + + namespace detail { + struct rpc_member { + // TODO: expand for all method arity and constness.... + template + static fc::function,A1> functor( P, + R (C::*mem_func)(A1), + const rpc_connection& c = rpc_connection(), + const char* name = nullptr + ) { + return [=](A1 a1)->fc::future{ return c.invoke( name, make_tuple(a1) ); }; + } + }; + + + struct vtable_visitor { + vtable_visitor( rpc_connection& c ):_con(c){} + + template + void operator()( const char* name, Function& memb, MemberPtr m )const { + memb = rpc_member::functor( nullptr, m, _con, name ); + } + rpc_connection& _con; + }; + + }; + + + template + class rpc_client : public ptr { + public: + rpc_client(){} + rpc_client( const rpc_connection& c ):_con(c){ + init(); + } + rpc_client( const rpc_client& c ):_con(c._con){} + + void set_connection( const rpc_connection& c ) { + _con = c; + init(); + } + const rpc_connection& connection()const { return _con; } + + private: + void init() { + this->_vtable.reset(new fc::detail::vtable() ); + this->_vtable->template visit( fc::json::detail::vtable_visitor(_con) ); + } + rpc_connection _con; + }; + +} } // fc::json diff --git a/include/fc/json_rpc_connection.hpp b/include/fc/json_rpc_connection.hpp index c473e79..375bcbb 100644 --- a/include/fc/json_rpc_connection.hpp +++ b/include/fc/json_rpc_connection.hpp @@ -1,15 +1,14 @@ -#ifndef _JSON_RPC_CONNECTION_HPP_ -#define _JSON_RPC_CONNECTION_HPP_ -#include +#pragma once #include -#include +#include #include +#include namespace fc { namespace json { namespace detail { struct pending_result : virtual public promise_base { typedef shared_ptr ptr; - virtual void handle_result( const fc::string& ) = 0; + virtual void handle_result( const fc::value& ) = 0; void handle_error( const fc::string& ); int64_t id; pending_result::ptr next; @@ -18,15 +17,15 @@ namespace fc { namespace json { }; template struct pending_result_impl : virtual public promise, virtual public pending_result { - virtual void handle_result( const fc::string& s ) { - set_value( fc::json::from_string(s) ); + virtual void handle_result( const fc::value& s ) { + this->set_value( value_cast(s) ); } protected: ~pending_result_impl(){} }; template<> struct pending_result_impl : virtual public promise, virtual public pending_result { - virtual void handle_result( const fc::string& ) { + virtual void handle_result( const fc::value& ) { set_value(); } protected: @@ -35,43 +34,53 @@ namespace fc { namespace json { } /** - This class is designed to be used like this: - @code - class my_api { - - future function( const string& arg, int arg2 ) { - _con->invoke( "function", {&arg,&arg2} ); - } - private: - rpc_connection* _con; - }; - @endcode - + * This class can be used to communicate via json-rpc over a pair of + * streams. + * + * @note rpc_connection has reference semantics and all 'copies' will + * refer to the same underlying stream. */ - class rpc_connection : virtual public retainable { + class rpc_connection { public: rpc_connection(); + /** note the life of i and o must be longer than rpc_connection's life */ rpc_connection( istream& i, ostream& o ); rpc_connection( rpc_connection&& c ); + rpc_connection( const rpc_connection& c ); ~rpc_connection(); rpc_connection& operator=(rpc_connection&& m); + rpc_connection& operator=(const rpc_connection& m); + /** note the life of i and o must be longer than rpc_connection's life */ void init( istream& i, ostream& o ); - template - future invoke( const fc::string& method, const cptr (¶ms)[N] ) { + /* + template + future invoke( const fc::string& method ) { auto r = new detail::pending_result_impl(); - invoke( detail::pending_result::ptr(r), method, N, params ); + invoke( detail::pending_result::ptr(r), method, value(make_tuple()) ); return promise::ptr( r, true ); + } */ + + template + future invoke( const fc::string& method, Args&& a = nullptr )const { + auto r = new detail::pending_result_impl(); + slog( "%p", r ); + typename promise::ptr rtn( r, true ); + invoke( detail::pending_result::ptr(r), method, value(fc::forward(a)) ); + return rtn; } private: - void invoke( detail::pending_result::ptr&& p, const fc::string& m, - uint16_t nparam, const cptr* param ); - class rpc_connection_d* my; + void invoke( detail::pending_result::ptr&& p, const fc::string& m, const value& param )const; + class impl; + fc::shared_ptr my; }; + + + + } } // fc::json -#endif // _JSON_RPC_CONNECTION_HPP_ diff --git a/src/json_rpc_connection.cpp b/src/json_rpc_connection.cpp index 3013900..22a5c0f 100644 --- a/src/json_rpc_connection.cpp +++ b/src/json_rpc_connection.cpp @@ -1,32 +1,94 @@ #include +#include +#include namespace fc { namespace json { - class rpc_connection_d { + namespace detail { + + void pending_result::handle_error( const fc::string& e ) { + try { + FC_THROW_MSG( "%s", e ); + } catch ( ... ) { + set_exception( fc::current_exception() ); + } + } + + } + + class rpc_connection::impl : public fc::retainable { public: - rpc_connection_d() - :_in(0),_out(0),_next_req_id(0){} + impl() + :_in(0),_out(0),_next_req_id(0){ } + ~impl(){ } istream* _in; ostream* _out; int64_t _next_req_id; detail::pending_result::ptr _pr_head; detail::pending_result::ptr _pr_tail; + + fc::future _read_loop_complete; + void read_loop() { + fc::string line; + fc::getline( *_in, line ); + while( !_in->eof() ) { + try { + fc::value v= fc::json::from_string( line ); + + auto id_itr = v.find( "id" ); + auto result_itr = v.find( "result" ); + if( id_itr != v.end() && result_itr != v.end() ) { + int id = value_cast(id_itr->val); + auto cur = _pr_head; + decltype(cur) prev; + while( cur ) { + if( cur->id == id ) { + if( prev ) prev->next = cur->next; + else _pr_head = cur->next; + if( !cur->next ) _pr_tail = cur->next; + + cur->handle_result( result_itr->val ); + break; + } + cur = cur->next; + } + } + } catch (...) { + wlog( "%s", fc::except_str().c_str() ); + } + fc::getline( *_in, line ); + } + slog( "Exit Read Loop, canceling waiting tasks!" ); + + auto cur = _pr_head; + while( cur ) { + cur->handle_error( "Connection Closed" ); + cur = cur->next; + } + + _pr_head.reset(); + _pr_tail.reset(); + } }; - rpc_connection::rpc_connection() { - my = new rpc_connection_d(); - } - rpc_connection::rpc_connection( istream& i, ostream& o ) { - my = new rpc_connection_d(); + rpc_connection::rpc_connection() + :my( new impl() ){ } + rpc_connection::rpc_connection( istream& i, ostream& o ) + :my( new impl() ) + { init( i, o ); } - rpc_connection::rpc_connection( rpc_connection&& c ) - :my(c.my) { - c.my = 0; + rpc_connection::rpc_connection( const rpc_connection& c ) + :my(c.my){ } + rpc_connection::rpc_connection( rpc_connection&& c ) { + fc::swap(my,c.my); } rpc_connection::~rpc_connection() { - delete my; } + rpc_connection& rpc_connection::operator=(const rpc_connection& m) { + my= m.my; + return *this; + } rpc_connection& rpc_connection::operator=(rpc_connection&& m) { fc::swap(m.my,my); return *this; @@ -35,26 +97,25 @@ namespace fc { namespace json { void rpc_connection::init( istream& i, ostream& o ) { my->_in = &i; my->_out = &o; + my->_read_loop_complete = fc::async( [=](){ my->read_loop(); } ); } - void rpc_connection::invoke( detail::pending_result::ptr&& p, const fc::string& m, - uint16_t nparam, const cptr* param ) { + void rpc_connection::invoke( detail::pending_result::ptr&& p, + const fc::string& m, const value& v )const { + p->id = ++my->_next_req_id; - my->_pr_tail->next = fc::move(p); - my->_pr_tail = my->_pr_tail->next; + if( my->_pr_tail ) { + my->_pr_tail->next = p; + my->_pr_tail = my->_pr_tail->next; + } else { + my->_pr_tail = p; + my->_pr_head = my->_pr_tail; + } ostream& out = *my->_out; - out << "{\"id\":"<id<<",\"method\":"< 0 ) { - out <<",\"params\":["; - uint16_t back = nparam -1; - for( uint16_t i = 0; i < back; ++i ) { - fc::json::write( out, *(param[i]) ); - out <<','; - } - fc::json::write( out, *(param[back]) ); - out<<']'; - } + out << "{\"id\":"<id<<",\"method\":\""<