updated json rpc to factor out stream connections

This commit is contained in:
Daniel Larimer 2012-10-29 17:57:34 -04:00
parent e8d4297f89
commit 5dcb2ea992
7 changed files with 225 additions and 280 deletions

View file

@ -15,8 +15,9 @@ namespace fc { namespace json {
template<typename R, typename C, typename P BOOST_PP_COMMA_IF(n) BOOST_PP_ENUM_PARAMS( n, typename A)> \
static fc::function<fc::future<R> BOOST_PP_COMMA_IF(n) BOOST_PP_ENUM_PARAMS(n,A) > \
functor( P, R (C::*mem_func)(BOOST_PP_ENUM_PARAMS(n,A)) IS_CONST, \
const rpc_connection& c = rpc_connection(), const char* name = nullptr ) { \
return [=](BOOST_PP_ENUM_BINARY_PARAMS(n,A,a))->fc::future<R>{ return c.invoke<R>( name, make_tuple(BOOST_PP_ENUM_PARAMS(n,a)) ); }; \
const rpc_connection::ptr& c = rpc_connection::ptr(), const char* name = nullptr ) { \
return [=](BOOST_PP_ENUM_BINARY_PARAMS(n,A,a))->fc::future<R>{ \
return c->invoke<R>( name, make_tuple(BOOST_PP_ENUM_PARAMS(n,a)) ); }; \
}
BOOST_PP_REPEAT( 8, RPC_MEMBER_FUNCTOR, const )
BOOST_PP_REPEAT( 8, RPC_MEMBER_FUNCTOR, BOOST_PP_EMPTY() )
@ -24,13 +25,13 @@ namespace fc { namespace json {
};
struct vtable_visitor {
vtable_visitor( rpc_connection& c ):_con(c){}
vtable_visitor( rpc_connection::ptr& c ):_con(c){}
template<typename Function, typename MemberPtr>
void operator()( const char* name, Function& memb, MemberPtr m )const {
memb = rpc_member::functor( nullptr, m, _con, name );
}
rpc_connection& _con;
rpc_connection::ptr& _con;
};
};
@ -39,12 +40,12 @@ namespace fc { namespace json {
class rpc_client : public ptr<InterfaceType,fc::json::detail::rpc_member> {
public:
rpc_client(){}
rpc_client( const rpc_connection& c ):_con(c){
rpc_client( const rpc_connection::ptr& c ):_con(c){
init();
}
rpc_client( const rpc_client& c ):_con(c._con){}
void set_connection( const rpc_connection& c ) {
void set_connection( const rpc_connection::ptr& c ) {
_con = c;
init();
}
@ -55,7 +56,7 @@ namespace fc { namespace json {
this->_vtable.reset(new fc::detail::vtable<InterfaceType,fc::json::detail::rpc_member>() );
this->_vtable->template visit<InterfaceType>( fc::json::detail::vtable_visitor(_con) );
}
rpc_connection _con;
rpc_connection::ptr _con;
};
} } // fc::json

View file

@ -1,6 +1,5 @@
#pragma once
#include <fc/json.hpp>
#include <fc/iostream.hpp>
#include <fc/future.hpp>
#include <fc/function.hpp>
@ -31,56 +30,67 @@ namespace fc { namespace json {
protected:
~pending_result_impl(){}
};
}
struct server_method : public fc::retainable {
typedef fc::shared_ptr<server_method> ptr;
virtual value call( const value& param ) = 0;
};
template<typename R, typename Args>
struct server_method_impl : public server_method {
server_method_impl( const fc::function<R,Args>& a ):func(a){};
virtual value call( const value& param ) {
return value( func( value_cast<Args>(param) ) );
}
fc::function<R,Args> func;
};
} // namespace detail
/**
* This class can be used to communicate via json-rpc over a pair of
* streams.
*
* @note rpc_connection has reference semantics and all 'copies' will
* refer to the same underlying stream.
* This is the base JSON RPC connection that handles the protocol
* level issues. It does not implement a transport which should
* be provided separately and use the handle_message and set_send_delegate
* methods to manage the protocol.
*/
class rpc_connection {
class rpc_connection : public fc::retainable {
public:
rpc_connection();
/** note the life of i and o must be longer than rpc_connection's life */
rpc_connection( istream& i, ostream& o );
rpc_connection( rpc_connection&& c );
rpc_connection( const rpc_connection& c );
rpc_connection(const rpc_connection&);
rpc_connection(rpc_connection&&);
~rpc_connection();
rpc_connection& operator=(const rpc_connection&);
rpc_connection& operator=(rpc_connection&&);
rpc_connection& operator=(rpc_connection&& m);
rpc_connection& operator=(const rpc_connection& m);
typedef fc::shared_ptr<rpc_connection> ptr;
/** note the life of i and o must be longer than rpc_connection's life */
void init( istream& i, ostream& o );
/*
template<typename R >
future<R> invoke( const fc::string& method ) {
auto r = new detail::pending_result_impl<R>();
invoke( detail::pending_result::ptr(r), method, value(make_tuple()) );
return promise<R>::ptr( r, true );
} */
void cancel_pending_requests();
template<typename R, typename Args >
future<R> invoke( const fc::string& method, Args&& a = nullptr )const {
future<R> invoke( const fc::string& method, Args&& a = nullptr ){
auto r = new detail::pending_result_impl<R>();
slog( "%p", r );
typename promise<R>::ptr rtn( r, true );
invoke( detail::pending_result::ptr(r), method, value(fc::forward<Args>(a)) );
return rtn;
}
template<typename R, typename Args >
void add_method( const fc::string& name, const fc::function<R,Args>& a ) {
this->add_method( name, detail::server_method::ptr(new detail::server_method_impl<R,Args>(a) ) );
}
protected:
void handle_message( const value& m );
virtual void send_invoke( uint64_t id, const fc::string& m, value&& param );
virtual void send_error( uint64_t id, int64_t code, const fc::string& msg );
virtual void send_result( uint64_t id, value&& r );
private:
void invoke( detail::pending_result::ptr&& p, const fc::string& m, const value& param )const;
void invoke( detail::pending_result::ptr&& p, const fc::string& m, value&& param );
void add_method( const fc::string& name, detail::server_method::ptr&& m );
class impl;
fc::shared_ptr<class impl> my;
};
} } // fc::json

View file

@ -0,0 +1,35 @@
#pragma once
#include <fc/json_rpc_connection.hpp>
namespace fc {
class istream;
class ostream;
namespace json {
class rpc_stream_connection : public rpc_connection {
public:
typedef fc::shared_ptr<rpc_stream_connection> ptr;
rpc_stream_connection( fc::istream&, fc::ostream& );
// the life of the streams must exceed the life of all copies
// of this rpc_stream_connection
void open( fc::istream&, fc::ostream& );
// cancels all pending requests, closes the ostream
// results on_close() being called if the stream is not already closed.
void close();
/**
* When the connection is closed, call the given method
*/
void on_close( const fc::function<void>& );
protected:
virtual void send_invoke( uint64_t id, const fc::string& m, value&& param );
virtual void send_error( uint64_t id, int64_t code, const fc::string& msg );
virtual void send_result( uint64_t id, value&& r );
private:
class impl;
fc::shared_ptr<impl> my;
};
} } // fc::json

View file

@ -1,174 +0,0 @@
#include <fc/json.hpp>
#include <fc/reflect.hpp>
#include <fc/stream.hpp>
// TODO: replace sstream with light/fast compiling version
#include <sstream>
namespace fc { namespace json {
class const_visitor : public fc::abstract_const_visitor {
public:
fc::ostream& out;
const_visitor( fc::ostream& o ):out(o){}
virtual void visit()const{}
virtual void visit( const char& c )const{ out << '"' << c << '"'; }
virtual void visit( const uint8_t& c )const{ out << int(c); }
virtual void visit( const uint16_t& c )const{ out << c; }
virtual void visit( const uint32_t& c )const{ out << c; }
virtual void visit( const uint64_t& c )const{ out << c; }
virtual void visit( const int8_t& c )const{ out << int(c); }
virtual void visit( const int16_t& c )const{ out << c;}
virtual void visit( const int32_t& c )const{ out << c;}
virtual void visit( const int64_t& c )const{ out << c;}
virtual void visit( const double& c )const{ out << c;}
virtual void visit( const float& c )const{ out << c;}
virtual void visit( const bool& c )const{ out << (c?"true":"false"); }
virtual void visit( const fc::string& c )const{ out << '"'<<c.c_str()<<'"'; }
virtual void visit( const char* member, int idx, int size, const cref& v)const{
if( !idx ) out <<"{";
out<<'"'<<member<<"\":";
v._reflector.visit( v._obj, *this );
if( idx != size-1 ) {
out <<',';
} else {
out << '}';
}
}
virtual void visit( int idx, int size, const cref& v)const{
if( !idx ) out << '[';
v._reflector.visit( v._obj, *this );
out << ((idx < (size -1) ) ? ',' : ']');
}
};
class visitor : public fc::abstract_visitor {
public:
virtual void visit(){}
virtual void visit( char& c )const{}
virtual void visit( uint8_t& c )const{}
virtual void visit( uint16_t& c )const{}
virtual void visit( uint32_t& c )const{}
virtual void visit( uint64_t& c )const{}
virtual void visit( int8_t& c )const{}
virtual void visit( int16_t& c )const{}
virtual void visit( int32_t& c )const{}
virtual void visit( int64_t& c )const{}
virtual void visit( double& c )const{}
virtual void visit( float& c )const{}
virtual void visit( bool& c )const{}
virtual void visit( fc::string& c )const{}
virtual void visit( const char* member, int idx, int size, const ref& v)const{}
virtual void visit( int idx, int size, const ref& v)const{}
};
string to_string( const cref& o ) {
std::stringstream ss;
{
fc::ostream os(ss);
o._reflector.visit( o._obj, fc::json::const_visitor(os) );
}
return ss.str().c_str();
}
void from_string( const string& s, const ref& o ) {
}
void write( ostream& out, const cref& val ) {
val._reflector.visit( val._obj, fc::json::const_visitor(out) );
}
uint8_t from_hex( char c ) {
if( c >= '0' && c <= '9' )
return c - '0';
if( c >= 'a' && c <= 'f' )
return c - 'a' + 10;
if( c >= 'A' && c <= 'F' )
return c - 'A' + 10;
return c;
}
string escape_string( const string& s ) {
// calculate escape string size.
uint32_t ecount = 0;
for( auto i = s.begin(); i != s.end(); ++i ) {
if( ' '<= *i && *i <= '~' && *i !='\\' && *i != '"' ) {
ecount+=1;
} else {
switch( *i ) {
case '\t' :
case '\n' :
case '\r' :
case '\\' :
case '"' :
ecount += 2; break;
default:
ecount += 4;
}
}
}
// unless the size changed, just return it.
if( ecount == s.size() ) { return s; }
// reserve the bytes
string out; out.reserve(ecount);
// print it out.
for( auto i = s.begin(); i != s.end(); ++i ) {
if( ' '<= *i && *i <= '~' && *i !='\\' && *i != '"' ) {
out += *i;
} else {
out += '\\';
switch( *i ) {
case '\t' : out += 't'; break;
case '\n' : out += 'n'; break;
case '\r' : out += 'r'; break;
case '\\' : out += '\\'; break;
case '"' : out += '"'; break;
default:
out += "x";
const char* const hexdig = "0123456789abcdef";
out += hexdig[*i >> 4];
out += hexdig[*i & 0xF];
}
}
}
return out;
}
string unescape_string( const string& s ) {
string out; out.reserve(s.size());
for( auto i = s.begin(); i != s.end(); ++i ) {
if( *i != '\\' ) {
if( *i != '"' ) out += *i;
}
else {
++i;
if( i == out.end() ) return out;
switch( *i ) {
case 't' : out += '\t'; break;
case 'n' : out += '\n'; break;
case 'r' : out += '\r'; break;
case '\\' : out += '\\'; break;
case '"' : out += '"'; break;
case 'x' : {
++i; if( i == out.end() ) return out;
char c = from_hex(*i);
++i; if( i == out.end() ) { out += c; return out; }
c = c<<4 | from_hex(*i);
out += c;
break;
}
default:
out += '\\';
out += *i;
}
}
}
return out;
}
} }

View file

@ -1,6 +1,9 @@
#include <fc/json_rpc_connection.hpp>
#include <fc/log.hpp>
#include <fc/thread.hpp>
#include <fc/error.hpp>
#include <unordered_map>
#include <string>
namespace fc { namespace json {
@ -18,65 +21,98 @@ namespace fc { namespace json {
class rpc_connection::impl : public fc::retainable {
public:
impl()
:_in(0),_out(0),_next_req_id(0){ }
~impl(){ }
istream* _in;
ostream* _out;
int64_t _next_req_id;
impl():_next_req_id(0){ }
~impl(){ cancel_pending_requests(); }
int64_t _next_req_id;
std::unordered_map<std::string,detail::server_method::ptr> _methods;
detail::pending_result::ptr _pr_head;
detail::pending_result::ptr _pr_tail;
fc::future<void> _read_loop_complete;
void read_loop() {
fc::string line;
fc::getline( *_in, line );
while( !_in->eof() ) {
try {
fc::value v= fc::json::from_string( line );
auto id_itr = v.find( "id" );
auto result_itr = v.find( "result" );
if( id_itr != v.end() && result_itr != v.end() ) {
int id = value_cast<int64_t>(id_itr->val);
auto cur = _pr_head;
decltype(cur) prev;
while( cur ) {
if( cur->id == id ) {
if( prev ) prev->next = cur->next;
else _pr_head = cur->next;
if( !cur->next ) _pr_tail = cur->next;
cur->handle_result( result_itr->val );
break;
}
cur = cur->next;
}
}
} catch (...) {
wlog( "%s", fc::except_str().c_str() );
}
fc::getline( *_in, line );
}
slog( "Exit Read Loop, canceling waiting tasks!" );
auto cur = _pr_head;
while( cur ) {
cur->handle_error( "Connection Closed" );
cur = cur->next;
}
_pr_head.reset();
_pr_tail.reset();
void cancel_pending_requests() {
auto cur = _pr_head;
while( cur ) {
cur->set_exception( fc::copy_exception( fc::generic_exception("canceled") ) );
cur = cur->next;
}
_pr_head.reset();
_pr_tail.reset();
}
};
void rpc_connection::handle_message( const value& v ) {
auto id_itr = v.find( "id" );
auto m_itr = v.find( "method" );
auto end = v.end();
if( m_itr != end ) {
fc::string mname = value_cast<fc::string>(m_itr->val);
auto id = value_cast<uint64_t>(id_itr->val);
auto smeth = my->_methods.find( mname );
if( smeth == my->_methods.end() ) {
if( id_itr != end ) {
// TODO: send invalid method reply
send_error( id, -1, "Unknown method '"+mname+"'");
}
// nothing to do, unknown method
} else { // known method, attempt to call it and send reply;
auto p_itr = v.find( "params" );
value nul;
const value& params = (p_itr != end) ? p_itr->val : nul;
if( id_itr != end ) { // capture reply
try {
send_result( id, smeth->second->call(params) );
} catch ( ... ) {
send_error( id, -1, fc::except_str() );
}
} else { // ignore exception + result
try { smeth->second->call( params ); }catch(...){}
}
}
return;
} else if( id_itr != end ) { // we id but no method, therefore potential reply
int id = value_cast<int64_t>(id_itr->val);
auto cur = my->_pr_head;
decltype(cur) prev;
while( cur ) {
if( cur->id == id ) {
if( prev ) prev->next = cur->next;
else my->_pr_head = cur->next;
if( !cur->next ) my->_pr_tail = cur->next;
try {
auto r_itr = v.find( "result" );
if( r_itr != end ) {
cur->handle_result( r_itr->val );
} else {
auto e_itr = v.find( "error" );
if( e_itr != end ) {
cur->set_exception(
fc::copy_exception(
fc::generic_exception(
value_cast<fc::string>(
e_itr->val["message"] ) ) ) );
}
}
} catch( ... ) {
cur->set_exception( fc::current_exception() );
}
return;
}
cur = cur->next;
}
FC_THROW_MSG( "Unexpected reply with id %s", id );
}
FC_THROW_MSG( "Method with no 'id' or 'method' field" );
}
rpc_connection::rpc_connection()
:my( new impl() ){ }
rpc_connection::rpc_connection( istream& i, ostream& o )
:my( new impl() )
{
init( i, o );
}
rpc_connection::rpc_connection( const rpc_connection& c )
:my(c.my){ }
rpc_connection::rpc_connection( rpc_connection&& c ) {
@ -94,16 +130,12 @@ namespace fc { namespace json {
return *this;
}
void rpc_connection::init( istream& i, ostream& o ) {
my->_in = &i;
my->_out = &o;
my->_read_loop_complete = fc::async( [=](){ my->read_loop(); } );
void rpc_connection::cancel_pending_requests() {
my->cancel_pending_requests();
}
void rpc_connection::invoke( detail::pending_result::ptr&& p,
const fc::string& m, const value& v )const {
p->id = ++my->_next_req_id;
void rpc_connection::invoke( detail::pending_result::ptr&& p,
const fc::string& m, value&& param ) {
if( my->_pr_tail ) {
my->_pr_tail->next = p;
my->_pr_tail = my->_pr_tail->next;
@ -111,13 +143,7 @@ namespace fc { namespace json {
my->_pr_tail = p;
my->_pr_head = my->_pr_tail;
}
ostream& out = *my->_out;
out << "{\"id\":"<<p->id<<",\"method\":\""<<escape_string(m);
out <<"\",\"params\":";
fc::json::write( out, v );
out<<"}\n";
out.flush();
send_invoke( ++my->_next_req_id, m, fc::move(param) );
}

View file

@ -0,0 +1,45 @@
/** note the life of i and o must be longer than rpc_connection's life */
rpc_connection( istream& i, ostream& o );
/** note the life of i and o must be longer than rpc_connection's life */
void init( istream& i, ostream& o );
istream* _in;
ostream* _out;
fc::future<void> _read_loop_complete;
void read_loop() {
fc::string line;
fc::getline( *_in, line );
while( !_in->eof() ) {
try {
fc::value v= fc::json::from_string( line );
} catch (...) {
wlog( "%s", fc::except_str().c_str() );
}
fc::getline( *_in, line );
}
slog( "Exit Read Loop, canceling waiting tasks!" );
auto cur = _pr_head;
while( cur ) {
cur->handle_error( "Connection Closed" );
cur = cur->next;
}
_pr_head.reset();
_pr_tail.reset();
}
rpc_connection::rpc_connection( istream& i, ostream& o )
:my( new impl() )
{
init( i, o );
}
void rpc_connection::init( istream& i, ostream& o ) {
my->_in = &i;
my->_out = &o;
my->_read_loop_complete = fc::async( [=](){ my->read_loop(); } );
}

View file

@ -1,4 +1,6 @@
#include <fc/json_rpc_stream_connection.hpp>
#include <fc/json_rpc_client.hpp>
#include <fc/iostream.hpp>
struct test {
int add(int x){ return x+1; }
@ -18,7 +20,7 @@ FC_STUB( test, (add)(sub)(sub1)(sub2)(sub3)(sub4)(sub5)(sub6)(sub7)(sub8)(sub9)
int main( int argc, char** argv ) {
try {
fc::json::rpc_connection con(fc::cin,fc::cout);
fc::json::rpc_connection::ptr con( new fc::json::rpc_stream_connection( fc::cin, fc::cout ) );
fc::json::rpc_client<test> c( con );
slog( "%d", c->add( 5 ).wait() );