adding json-rpc client/connection

This commit is contained in:
Daniel Larimer 2012-10-26 01:03:01 -04:00
parent 632bc71c2a
commit ff226f9df4
3 changed files with 183 additions and 55 deletions

View file

@ -0,0 +1,58 @@
#pragma once
#include <fc/actor.hpp>
#include <fc/json_rpc_connection.hpp>
namespace fc { namespace json {
namespace detail {
struct rpc_member {
// TODO: expand for all method arity and constness....
template<typename R, typename C, typename A1, typename P>
static fc::function<fc::future<R>,A1> functor( P,
R (C::*mem_func)(A1),
const rpc_connection& c = rpc_connection(),
const char* name = nullptr
) {
return [=](A1 a1)->fc::future<R>{ return c.invoke<R>( name, make_tuple(a1) ); };
}
};
struct vtable_visitor {
vtable_visitor( rpc_connection& c ):_con(c){}
template<typename Function, typename MemberPtr>
void operator()( const char* name, Function& memb, MemberPtr m )const {
memb = rpc_member::functor( nullptr, m, _con, name );
}
rpc_connection& _con;
};
};
template<typename InterfaceType>
class rpc_client : public ptr<InterfaceType,fc::json::detail::rpc_member> {
public:
rpc_client(){}
rpc_client( const rpc_connection& c ):_con(c){
init();
}
rpc_client( const rpc_client& c ):_con(c._con){}
void set_connection( const rpc_connection& c ) {
_con = c;
init();
}
const rpc_connection& connection()const { return _con; }
private:
void init() {
this->_vtable.reset(new fc::detail::vtable<InterfaceType,fc::json::detail::rpc_member>() );
this->_vtable->template visit<InterfaceType>( fc::json::detail::vtable_visitor(_con) );
}
rpc_connection _con;
};
} } // fc::json

View file

@ -1,15 +1,14 @@
#ifndef _JSON_RPC_CONNECTION_HPP_
#define _JSON_RPC_CONNECTION_HPP_
#include <fc/reflect_ptr.hpp>
#pragma once
#include <fc/json.hpp>
#include <fc/stream.hpp>
#include <fc/iostream.hpp>
#include <fc/future.hpp>
#include <fc/function.hpp>
namespace fc { namespace json {
namespace detail {
struct pending_result : virtual public promise_base {
typedef shared_ptr<pending_result> ptr;
virtual void handle_result( const fc::string& ) = 0;
virtual void handle_result( const fc::value& ) = 0;
void handle_error( const fc::string& );
int64_t id;
pending_result::ptr next;
@ -18,15 +17,15 @@ namespace fc { namespace json {
};
template<typename T>
struct pending_result_impl : virtual public promise<T>, virtual public pending_result {
virtual void handle_result( const fc::string& s ) {
set_value( fc::json::from_string<T>(s) );
virtual void handle_result( const fc::value& s ) {
this->set_value( value_cast<T>(s) );
}
protected:
~pending_result_impl(){}
};
template<>
struct pending_result_impl<void> : virtual public promise<void>, virtual public pending_result {
virtual void handle_result( const fc::string& ) {
virtual void handle_result( const fc::value& ) {
set_value();
}
protected:
@ -35,43 +34,53 @@ namespace fc { namespace json {
}
/**
This class is designed to be used like this:
@code
class my_api {
future<int> function( const string& arg, int arg2 ) {
_con->invoke<int>( "function", {&arg,&arg2} );
}
private:
rpc_connection* _con;
};
@endcode
* This class can be used to communicate via json-rpc over a pair of
* streams.
*
* @note rpc_connection has reference semantics and all 'copies' will
* refer to the same underlying stream.
*/
class rpc_connection : virtual public retainable {
class rpc_connection {
public:
rpc_connection();
/** note the life of i and o must be longer than rpc_connection's life */
rpc_connection( istream& i, ostream& o );
rpc_connection( rpc_connection&& c );
rpc_connection( const rpc_connection& c );
~rpc_connection();
rpc_connection& operator=(rpc_connection&& m);
rpc_connection& operator=(const rpc_connection& m);
/** note the life of i and o must be longer than rpc_connection's life */
void init( istream& i, ostream& o );
template<typename R, uint16_t N>
future<R> invoke( const fc::string& method, const cptr (&params)[N] ) {
/*
template<typename R >
future<R> invoke( const fc::string& method ) {
auto r = new detail::pending_result_impl<R>();
invoke( detail::pending_result::ptr(r), method, N, params );
invoke( detail::pending_result::ptr(r), method, value(make_tuple()) );
return promise<R>::ptr( r, true );
} */
template<typename R, typename Args >
future<R> invoke( const fc::string& method, Args&& a = nullptr )const {
auto r = new detail::pending_result_impl<R>();
slog( "%p", r );
typename promise<R>::ptr rtn( r, true );
invoke( detail::pending_result::ptr(r), method, value(fc::forward<Args>(a)) );
return rtn;
}
private:
void invoke( detail::pending_result::ptr&& p, const fc::string& m,
uint16_t nparam, const cptr* param );
class rpc_connection_d* my;
void invoke( detail::pending_result::ptr&& p, const fc::string& m, const value& param )const;
class impl;
fc::shared_ptr<class impl> my;
};
} } // fc::json
#endif // _JSON_RPC_CONNECTION_HPP_

View file

@ -1,32 +1,94 @@
#include <fc/json_rpc_connection.hpp>
#include <fc/log.hpp>
#include <fc/thread.hpp>
namespace fc { namespace json {
class rpc_connection_d {
namespace detail {
void pending_result::handle_error( const fc::string& e ) {
try {
FC_THROW_MSG( "%s", e );
} catch ( ... ) {
set_exception( fc::current_exception() );
}
}
}
class rpc_connection::impl : public fc::retainable {
public:
rpc_connection_d()
:_in(0),_out(0),_next_req_id(0){}
impl()
:_in(0),_out(0),_next_req_id(0){ }
~impl(){ }
istream* _in;
ostream* _out;
int64_t _next_req_id;
detail::pending_result::ptr _pr_head;
detail::pending_result::ptr _pr_tail;
fc::future<void> _read_loop_complete;
void read_loop() {
fc::string line;
fc::getline( *_in, line );
while( !_in->eof() ) {
try {
fc::value v= fc::json::from_string( line );
auto id_itr = v.find( "id" );
auto result_itr = v.find( "result" );
if( id_itr != v.end() && result_itr != v.end() ) {
int id = value_cast<int64_t>(id_itr->val);
auto cur = _pr_head;
decltype(cur) prev;
while( cur ) {
if( cur->id == id ) {
if( prev ) prev->next = cur->next;
else _pr_head = cur->next;
if( !cur->next ) _pr_tail = cur->next;
cur->handle_result( result_itr->val );
break;
}
cur = cur->next;
}
}
} catch (...) {
wlog( "%s", fc::except_str().c_str() );
}
fc::getline( *_in, line );
}
slog( "Exit Read Loop, canceling waiting tasks!" );
auto cur = _pr_head;
while( cur ) {
cur->handle_error( "Connection Closed" );
cur = cur->next;
}
_pr_head.reset();
_pr_tail.reset();
}
};
rpc_connection::rpc_connection() {
my = new rpc_connection_d();
}
rpc_connection::rpc_connection( istream& i, ostream& o ) {
my = new rpc_connection_d();
rpc_connection::rpc_connection()
:my( new impl() ){ }
rpc_connection::rpc_connection( istream& i, ostream& o )
:my( new impl() )
{
init( i, o );
}
rpc_connection::rpc_connection( rpc_connection&& c )
:my(c.my) {
c.my = 0;
rpc_connection::rpc_connection( const rpc_connection& c )
:my(c.my){ }
rpc_connection::rpc_connection( rpc_connection&& c ) {
fc::swap(my,c.my);
}
rpc_connection::~rpc_connection() {
delete my;
}
rpc_connection& rpc_connection::operator=(const rpc_connection& m) {
my= m.my;
return *this;
}
rpc_connection& rpc_connection::operator=(rpc_connection&& m) {
fc::swap(m.my,my);
return *this;
@ -35,26 +97,25 @@ namespace fc { namespace json {
void rpc_connection::init( istream& i, ostream& o ) {
my->_in = &i;
my->_out = &o;
my->_read_loop_complete = fc::async( [=](){ my->read_loop(); } );
}
void rpc_connection::invoke( detail::pending_result::ptr&& p, const fc::string& m,
uint16_t nparam, const cptr* param ) {
void rpc_connection::invoke( detail::pending_result::ptr&& p,
const fc::string& m, const value& v )const {
p->id = ++my->_next_req_id;
my->_pr_tail->next = fc::move(p);
my->_pr_tail = my->_pr_tail->next;
if( my->_pr_tail ) {
my->_pr_tail->next = p;
my->_pr_tail = my->_pr_tail->next;
} else {
my->_pr_tail = p;
my->_pr_head = my->_pr_tail;
}
ostream& out = *my->_out;
out << "{\"id\":"<<p->id<<",\"method\":"<<escape_string(m);
if( nparam > 0 ) {
out <<",\"params\":[";
uint16_t back = nparam -1;
for( uint16_t i = 0; i < back; ++i ) {
fc::json::write( out, *(param[i]) );
out <<',';
}
fc::json::write( out, *(param[back]) );
out<<']';
}
out << "{\"id\":"<<p->id<<",\"method\":\""<<escape_string(m);
out <<"\",\"params\":";
fc::json::write( out, v );
out<<"}\n";
out.flush();
}