json rpc updates, tcp server/client support

This commit is contained in:
Daniel Larimer 2012-11-05 23:34:58 -05:00
parent 6b14a176d0
commit a6541b825a
17 changed files with 292 additions and 55 deletions

View file

@ -50,6 +50,8 @@ set( sources
src/http_connection.cpp
src/json_rpc_connection.cpp
src/json_rpc_stream_connection.cpp
src/json_rpc_tcp_connection.cpp
src/json_rpc_tcp_server.cpp
src/value.cpp
src/lexical_cast.cpp
src/spin_lock.cpp

View file

@ -25,8 +25,7 @@ class function {
function& operator=( const function& c ) { func = c.func; return *this; }
function& operator=( function&& c ) { fc::swap(func,c.func); return *this; }
template<typename... Args2>
R operator()( Args2... args2)const { return func->call(fc::forward<Args2>(args2)...); }
R operator()( Args... args)const { return func->call(args...); }
bool operator!()const { return !func; }
@ -34,7 +33,7 @@ class function {
struct impl_base : public fc::retainable {
virtual ~impl_base(){}
virtual R call(Args...) = 0;
virtual R call(Args...)const = 0;
};
template<typename Functor>
@ -42,7 +41,7 @@ class function {
template<typename U>
impl( U&& u ):func( fc::forward<U>(u) ){}
virtual R call(Args... args) { return func(args...); }
virtual R call(Args... args)const { return func(args...); }
Functor func;
};

View file

@ -54,7 +54,7 @@ namespace fc { namespace json {
private:
void init() {
this->_vtable.reset(new fc::detail::vtable<InterfaceType,fc::json::detail::rpc_member>() );
this->_vtable->template visit<InterfaceType>( fc::json::detail::vtable_visitor(_con) );
this->_vtable->template visit_other<InterfaceType>( fc::json::detail::vtable_visitor(_con) );
}
rpc_connection::ptr _con;
};

View file

@ -52,10 +52,10 @@ namespace fc { namespace json {
template<typename InterfaceType>
struct add_method_visitor {
public:
add_method_visitor( const fc::ptr<InterfaceType>& p, fc::json::rpc_connection& c ):_ptr(p){}
add_method_visitor( const fc::ptr<InterfaceType>& p, fc::json::rpc_connection& c ):_ptr(p),_con(c){}
template<typename R, typename Args, typename Type>
void operator()( const char* name, fc::function<R,Args>& meth, Type );
template<typename R, typename Args>
void operator()( const char* name, fc::function<R,Args>& meth);
const fc::ptr<InterfaceType>& _ptr;
fc::json::rpc_connection& _con;
@ -111,6 +111,8 @@ namespace fc { namespace json {
private:
void invoke( detail::pending_result::ptr&& p, const fc::string& m, value&& param );
void add_method( const fc::string& name, rpc_server_method::ptr&& m );
template<typename InterfaceType>
friend struct detail::add_method_visitor;
class impl;
fc::shared_ptr<class impl> my;
@ -119,8 +121,8 @@ namespace fc { namespace json {
namespace detail {
template<typename InterfaceType>
template<typename R, typename Args, typename Type>
void add_method_visitor<InterfaceType>::operator()( const char* name, fc::function<R,Args>& meth, Type ) {
template<typename R, typename Args>
void add_method_visitor<InterfaceType>::operator()( const char* name, fc::function<R,Args>& meth) {
_con.add_method( name, rpc_server_method::ptr( new rpc_server_method_impl<R,Args>(meth) ) );
}

View file

@ -10,6 +10,8 @@ namespace fc {
public:
typedef fc::shared_ptr<rpc_stream_connection> ptr;
rpc_stream_connection( fc::istream&, fc::ostream& );
rpc_stream_connection(const rpc_stream_connection& );
rpc_stream_connection();
// the life of the streams must exceed the life of all copies
// of this rpc_stream_connection
@ -17,7 +19,7 @@ namespace fc {
// cancels all pending requests, closes the ostream
// results on_close() being called if the stream is not already closed.
void close();
virtual void close();
/**
* When the connection is closed, call the given method
@ -25,9 +27,11 @@ namespace fc {
void on_close( const fc::function<void>& );
protected:
~rpc_stream_connection();
virtual void send_invoke( uint64_t id, const fc::string& m, value&& param );
virtual void send_error( uint64_t id, int64_t code, const fc::string& msg );
virtual void send_result( uint64_t id, value&& r );
private:
class impl;
fc::shared_ptr<impl> my;

View file

@ -0,0 +1,28 @@
#pragma once
#include <fc/json_rpc_stream_connection.hpp>
namespace fc {
class tcp_socket;
namespace ip { class endpoint; }
namespace json {
class rpc_tcp_connection : public rpc_stream_connection {
public:
typedef fc::shared_ptr<rpc_tcp_connection> ptr;
rpc_tcp_connection();
rpc_tcp_connection( const rpc_tcp_connection& c );
~rpc_tcp_connection();
void connect_to( const fc::ip::endpoint& e );
void start();
tcp_socket& get_socket()const;
virtual void close();
private:
class impl;
fc::shared_ptr<impl> my;
};
} // json
} // fc

View file

@ -0,0 +1,42 @@
#pragma once
#include <fc/json_rpc_connection.hpp>
namespace fc {
namespace json {
class rpc_tcp_server {
private:
template<typename Interface>
struct add_method_visitor {
add_method_visitor( fc::ptr<Interface>& p, rpc_connection& s ):_ptr(p),_rpcc(s) { }
template<typename Functor>
void operator()( const char* name, Functor& fun ) {
_rpcc.add_method( name, fun );
}
fc::ptr<Interface>& _ptr;
rpc_connection& _rpcc;
};
public:
rpc_tcp_server();
~rpc_tcp_server();
template<typename Interface>
void add_interface( const fc::ptr<Interface>& ptr ) {
on_new_connection( [=]( rpc_connection& c ) {
ptr->visit( detail::add_method_visitor<Interface>( ptr, c ) );
});
}
void on_new_connection( const fc::function<void,rpc_connection&>& c );
void listen( uint16_t port );
private:
class impl;
impl* my;
};
}
}

View file

@ -71,6 +71,9 @@ namespace fc {
template<typename R>
struct lexical_cast<bool, R> { static bool cast( const R& v ) { return v; } };
template<>
struct lexical_cast<char, fc::string> { static bool cast( const fc::string& v ) { return v[0]; } };// TODO: check string len
template<>
struct lexical_cast<bool, fc::string> { static bool cast( const fc::string& v ) { return v == "true"; } };

View file

@ -8,13 +8,13 @@ namespace fc {
struct identity_member {
// TODO: enumerate all method patterns
template<typename R, typename C, typename P>
static fc::function<R()> functor( P&& p, R (C::*mem_func)() ) {
static fc::function<R> functor( P&& p, R (C::*mem_func)() ) {
return [=](){ return (p->*mem_func)(); };
}
template<typename R, typename C, typename A1, typename P>
static fc::function<R(A1)> functor( P&& p, R (C::*mem_func)(A1) ) {
return [=](A1 a1){ return (p->*mem_func)(a1); };
static fc::function<R,A1> functor( P&& p, R (C::*mem_func)(A1) ) {
return fc::function<R,A1>([=](A1 a1){ return (p->*mem_func)(a1); });
}
};
@ -44,24 +44,24 @@ namespace fc {
template<typename InterfaceType>
ptr( InterfaceType* p )
:_vtable( new vtable_type() ) {
_vtable->template visit<InterfaceType>( detail::vtable_visitor<InterfaceType*>(p) );
_vtable->template visit_other<InterfaceType>( detail::vtable_visitor<InterfaceType*>(p) );
}
template<typename InterfaceType>
ptr( const fc::shared_ptr<InterfaceType>& p )
:_vtable( new vtable_type() ),_self(p){
_vtable->template visit<InterfaceType>( detail::vtable_visitor<InterfaceType*>(p.get()) );
_vtable->template visit_other<InterfaceType>( detail::vtable_visitor<InterfaceType*>(p.get()) );
}
vtable_type& operator*() { return *_vtable; }
const vtable_type& operator*()const { return *_vtable; }
//vtable_type& operator*() { return *_vtable; }
vtable_type& operator*()const { return *_vtable; }
vtable_type* operator->() { return _vtable.get(); }
const vtable_type* operator->()const { return _vtable.get(); }
//vtable_type* operator->() { return _vtable.get(); }
vtable_type* operator->()const { return _vtable.get(); }
protected:
fc::shared_ptr< vtable_type > _vtable;
fc::shared_ptr< fc::retainable > _self;
fc::shared_ptr< vtable_type > _vtable;
fc::shared_ptr< fc::retainable > _self;
};
@ -72,8 +72,10 @@ namespace fc {
#define FC_STUB_VTABLE_DEFINE_MEMBER( r, data, elem ) \
decltype(Transform::functor( (data*)nullptr, &data::elem)) elem;
#define FC_STUB_VTABLE_DEFINE_VISIT_OTHER( r, data, elem ) \
v( BOOST_PP_STRINGIZE(elem), elem, &T::elem );
#define FC_STUB_VTABLE_DEFINE_VISIT( r, data, elem ) \
v( BOOST_PP_STRINGIZE(elem), elem, &T::elem ); \
v( BOOST_PP_STRINGIZE(elem), elem );
#define FC_STUB( CLASS, METHODS ) \
namespace fc { namespace detail { \
@ -82,7 +84,11 @@ namespace fc { namespace detail { \
vtable(){} \
BOOST_PP_SEQ_FOR_EACH( FC_STUB_VTABLE_DEFINE_MEMBER, CLASS, METHODS ) \
template<typename T, typename Visitor> \
void visit( Visitor&& v ) { \
void visit_other( Visitor&& v ){ \
BOOST_PP_SEQ_FOR_EACH( FC_STUB_VTABLE_DEFINE_VISIT_OTHER, CLASS, METHODS ) \
} \
template<typename Visitor> \
void visit( Visitor&& v ){ \
BOOST_PP_SEQ_FOR_EACH( FC_STUB_VTABLE_DEFINE_VISIT, CLASS, METHODS ) \
} \
}; \

View file

@ -2,24 +2,33 @@
#define _FC_TCP_SOCKET_HPP_
#include <fc/utility.hpp>
#include <fc/fwd.hpp>
#include <fc/iostream.hpp>
namespace fc {
namespace ip { class endpoint; }
class tcp_socket {
class tcp_socket : public iostream {
public:
tcp_socket();
~tcp_socket();
void connect_to( const fc::ip::endpoint& e );
void connect_to( const fc::ip::endpoint& e );
void write( const char* buffer, size_t len );
size_t readsome( char* buffer, size_t max );
size_t read( char* buffer, size_t s );
/// istream interface
/// @{
virtual size_t readsome( char* buffer, size_t max );
virtual istream& read( char* buffer, size_t s );
virtual bool eof()const;
/// @}
/// ostream interface
/// @{
virtual ostream& write( const char* buffer, size_t len );
virtual void flush();
virtual void close();
/// @}
bool is_open()const;
void flush();
private:
friend class tcp_server;
class impl;
@ -28,16 +37,20 @@ namespace fc {
class tcp_server {
public:
tcp_server(uint16_t port=0);
tcp_server();
~tcp_server();
void close();
bool accept( tcp_socket& s );
// void listen( uint16_t port );
void listen( uint16_t port );
private:
// non copyable
tcp_server( const tcp_server& );
tcp_server& operator=(const tcp_server& s );
class impl;
fc::fwd<impl,32> my;
impl* my;
};
} // namesapce fc

View file

@ -10,7 +10,7 @@ FC_START_SHARED_IMPL(fc::http::connection)
int read_until( char* buffer, char* end, char c = '\n' ) {
char* p = buffer;
// try {
while( p < end && sock.read(p,1) ) {
while( p < end && !sock.read(p,1).eof() ) {
if( *p == c ) {
*p = '\0';
return (p - buffer)-1;

View file

@ -41,6 +41,9 @@ namespace fc { namespace json {
};
void rpc_connection::add_method( const fc::string& name, fc::shared_ptr<fc::json::rpc_server_method>&& p ) {
my->_methods[name] = fc::move(p);
}
void rpc_connection::handle_message( const value& v ) {
auto id_itr = v.find( "id" );
auto m_itr = v.find( "method" );
@ -143,7 +146,7 @@ namespace fc { namespace json {
my->_pr_tail = p;
my->_pr_head = my->_pr_tail;
}
send_invoke( ++my->_next_req_id, m, fc::move(param) );
send_invoke( my->_next_req_id++, m, fc::move(param) );
}

View file

@ -1,5 +1,6 @@
#include <fc/json_rpc_stream_connection.hpp>
#include <fc/iostream.hpp>
#include <fc/sstream.hpp>
#include <fc/thread.hpp>
namespace fc { namespace json {
@ -13,6 +14,7 @@ namespace fc { namespace json {
impl( fc::istream& i, fc::ostream& o, rpc_stream_connection& s )
:in(i),out(o),self(s){
slog( "%p", this );
_read_loop_complete = fc::async( [=](){ read_loop(); } );
}
@ -26,17 +28,20 @@ namespace fc { namespace json {
fc::future<void> _read_loop_complete;
void read_loop() {
slog( "%p", this );
fc::string line;
fc::getline( in, line );
while( !in.eof() ) {
try {
fc::value v= fc::json::from_string( line );
slog( "%s", fc::json::to_string(v).c_str() );
self.handle_message(v);
} catch (...) {
wlog( "%s", fc::except_str().c_str() );
}
fc::getline( in, line );
}
slog( "close read loop" );
self.cancel_pending_requests();
if( !!on_close ) on_close();
}
@ -45,17 +50,24 @@ namespace fc { namespace json {
rpc_stream_connection::rpc_stream_connection( fc::istream& i, fc::ostream& o )
:my( new impl(i,o,*this) ){
}
rpc_stream_connection::rpc_stream_connection(){ slog( "default" ); }
rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){}
rpc_stream_connection::~rpc_stream_connection(){
// slog( "%p", my.get() );
}
// the life of the streams must exceed the life of all copies
// of this rpc_stream_connection
void rpc_stream_connection::open( fc::istream& i, fc::ostream& o) {
my.reset( new impl(i,o,*this) );
slog( "open... %p", my.get() );
}
// cancels all pending requests, closes the ostream
// results on_close() being called if the stream is not already closed.
void rpc_stream_connection::close() {
my->out.close();
if( my ) my->out.close();
my.reset(nullptr);
}
/**
@ -66,10 +78,22 @@ namespace fc { namespace json {
}
void rpc_stream_connection::send_invoke( uint64_t id, const fc::string& m, value&& param ) {
fc::stringstream ss;
ss<<"{\"id\":"<<id<<",\"method\":\""<<m<<"\",\"params\":"<<fc::json::to_string(param)<<"}\n";
fc::string o = ss.str();
my->out.write( o.c_str(), o.size() );
}
void rpc_stream_connection::send_error( uint64_t id, int64_t code, const fc::string& msg ) {
fc::stringstream ss;
ss<<"{\"id\":"<<id<<",\"error\":{\"code\":"<<code<<",\"message\":"<<fc::json::to_string(fc::value(msg))<<"}}\n";
fc::string o = ss.str();
my->out.write( o.c_str(), o.size() );
}
void rpc_stream_connection::send_result( uint64_t id, value&& r ) {
fc::stringstream ss;
ss<<"{\"id\":"<<id<<",\"result\":"<<fc::json::to_string(r)<<"}\n";
fc::string o = ss.str();
my->out.write( o.c_str(), o.size() );
}
} } // fc::json

View file

@ -0,0 +1,43 @@
#include <fc/json_rpc_tcp_connection.hpp>
#include <fc/tcp_socket.hpp>
namespace fc {
namespace json {
class rpc_tcp_connection::impl : public fc::retainable {
public:
tcp_socket sock;
~impl(){ slog( "%p", this );}
};
rpc_tcp_connection::rpc_tcp_connection()
:my( new impl() ){
}
rpc_tcp_connection::rpc_tcp_connection( const rpc_tcp_connection& c )
:rpc_stream_connection(c),my(c.my){}
rpc_tcp_connection::~rpc_tcp_connection(){
close();
}
void rpc_tcp_connection::connect_to( const fc::ip::endpoint& ep ) {
my->sock.connect_to(ep);
wlog( "Connected %p", my.get() );
open( my->sock, my->sock );
}
void rpc_tcp_connection::start() {
slog( "open... %p", my.get() );
open( my->sock, my->sock );
}
void rpc_tcp_connection::close() {
slog( "close %p", my.get() );
rpc_stream_connection::close();
//my->sock.close();
}
tcp_socket& rpc_tcp_connection::get_socket()const { return my->sock; }
}
}

View file

@ -0,0 +1,40 @@
#include <fc/json_rpc_tcp_server.hpp>
#include <fc/json_rpc_tcp_connection.hpp>
#include <fc/tcp_socket.hpp>
#include <fc/thread.hpp>
namespace fc {
namespace json {
class rpc_tcp_server::impl {
public:
fc::function<void,rpc_connection&> on_con;
fc::tcp_server tcp_serv;
fc::vector<rpc_tcp_connection::ptr> cons;
};
rpc_tcp_server::rpc_tcp_server()
:my( new impl() ){}
rpc_tcp_server::~rpc_tcp_server() {
delete my;
}
void rpc_tcp_server::on_new_connection( const fc::function<void,rpc_connection&>& c ) {
my->on_con = c;
}
void rpc_tcp_server::listen( uint16_t port ) {
my->tcp_serv.listen(port);
fc::async([this](){
rpc_tcp_connection::ptr con(new rpc_tcp_connection() );
while( my->tcp_serv.accept( con->get_socket() ) ) {
slog( "new connection!" );
my->on_con( *con );
con->start();
my->cons.push_back(con);
con.reset(new rpc_tcp_connection() );
}
});
}
}
}

View file

@ -9,8 +9,9 @@ namespace fc {
class tcp_socket::impl {
public:
impl():_sock( fc::asio::default_io_service() ){}
impl():_sock( fc::asio::default_io_service() ){ slog( "sock %p", this); }
~impl(){
slog( "~sock %p", this );
if( _sock.is_open() ) _sock.close();
}
@ -22,10 +23,18 @@ namespace fc {
tcp_socket::tcp_socket(){}
tcp_socket::~tcp_socket(){}
tcp_socket::~tcp_socket(){ slog( "%p", &my); }
void tcp_socket::flush() {}
void tcp_socket::close() {
my->_sock.close();
}
void tcp_socket::write( const char* buf, size_t len ) {
bool tcp_socket::eof()const {
return !my->_sock.is_open();
}
fc::ostream& tcp_socket::write( const char* buf, size_t len ) {
boost::system::error_code ec;
size_t w = my->_sock.write_some( boost::asio::buffer( buf, len ), ec );
@ -46,6 +55,7 @@ namespace fc {
wlog( "throw" );
throw boost::system::system_error(ec);
}
return *this;
}
size_t tcp_socket::readsome( char* buf, size_t len ) {
boost::system::error_code ec;
@ -63,12 +73,12 @@ namespace fc {
}
return w;
}
size_t tcp_socket::read( char* buffer, size_t s ) {
fc::istream& tcp_socket::read( char* buffer, size_t s ) {
size_t r = readsome( buffer, s );
while( r < s ) {
r += readsome( buffer + r, s - r );
}
return r;
return *this;
}
void tcp_socket::connect_to( const fc::ip::endpoint& e ) {
fc::asio::tcp::connect(my->_sock, fc::asio::tcp::endpoint( boost::asio::ip::address_v4(e.get_address()), e.port() ) );
@ -84,12 +94,14 @@ namespace fc {
boost::asio::ip::tcp::acceptor _accept;
};
void tcp_server::close() {
if( my->_accept.is_open() ) my->_accept.close();
if( my && my->_accept.is_open() ) my->_accept.close();
delete my; my = nullptr;
}
tcp_server::tcp_server(uint16_t port)
:my(port) {
tcp_server::tcp_server()
:my(nullptr) {
}
tcp_server::~tcp_server() {
delete my;
}
@ -103,16 +115,11 @@ namespace fc {
if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
return true;
}
#if 0
void tcp_server::listen( uint16_t port ) {
/*
slog( "listen %d!", port );
my->_accept.bind(
slog( "listen %d!", port );
my->_accept.listen(port);
slog( "listen %d!", port );
*/
if( my ) delete my;
my = new impl(port);
}
#endif
} // namespace fc

View file

@ -1,5 +1,8 @@
#include <fc/json_rpc_stream_connection.hpp>
#include <fc/json_rpc_client.hpp>
#include <fc/json_rpc_tcp_server.hpp>
#include <fc/json_rpc_tcp_connection.hpp>
#include <fc/ip.hpp>
#include <fc/iostream.hpp>
struct test {
@ -20,11 +23,29 @@ FC_STUB( test, (add)(sub)(sub1)(sub2)(sub3)(sub4)(sub5)(sub6)(sub7)(sub8)(sub9)
int main( int argc, char** argv ) {
try {
fc::ptr<test> t( new test() );
fc::json::rpc_tcp_server serv;
serv.add_interface( t );
serv.listen(8001);
slog("...");
{
wlog( "create new connection" );
fc::json::rpc_tcp_connection::ptr con(new fc::json::rpc_tcp_connection());
wlog( "connnect to..." );
con->connect_to( fc::ip::endpoint::from_string("127.0.0.1:8001") );
wlog( "connected, " );
fc::json::rpc_client<test> rpcc( con );
slog( "5+1=%d", rpcc->add(5).wait() );
}
slog( "exit serv" );
/*
fc::json::rpc_connection::ptr con( new fc::json::rpc_stream_connection( fc::cin, fc::cout ) );
fc::json::rpc_client<test> c( con );
slog( "%d", c->add( 5 ).wait() );
slog( "%d", c->add( 6 ).wait() );
*/
slog( "Exiting" );
} catch ( ... ) {