From a6541b825a18d047c20451bdea25a5b568ccbd22 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Mon, 5 Nov 2012 23:34:58 -0500 Subject: [PATCH] json rpc updates, tcp server/client support --- CMakeLists.txt | 2 ++ include/fc/function.hpp | 7 ++-- include/fc/json_rpc_client.hpp | 2 +- include/fc/json_rpc_connection.hpp | 12 ++++--- include/fc/json_rpc_stream_connection.hpp | 6 +++- include/fc/json_rpc_tcp_connection.hpp | 28 +++++++++++++++ include/fc/json_rpc_tcp_server.hpp | 42 ++++++++++++++++++++++ include/fc/lexical_cast.hpp | 3 ++ include/fc/ptr.hpp | 32 ++++++++++------- include/fc/tcp_socket.hpp | 33 +++++++++++------ src/http_connection.cpp | 2 +- src/json_rpc_connection.cpp | 5 ++- src/json_rpc_stream_connection.cpp | 28 +++++++++++++-- src/json_rpc_tcp_connection.cpp | 43 +++++++++++++++++++++++ src/json_rpc_tcp_server.cpp | 40 +++++++++++++++++++++ src/tcp_socket.cpp | 41 ++++++++++++--------- tests/json_rpc_test.cpp | 21 +++++++++++ 17 files changed, 292 insertions(+), 55 deletions(-) create mode 100644 include/fc/json_rpc_tcp_connection.hpp create mode 100644 include/fc/json_rpc_tcp_server.hpp create mode 100644 src/json_rpc_tcp_connection.cpp create mode 100644 src/json_rpc_tcp_server.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 75a8607..5bc208f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,8 @@ set( sources src/http_connection.cpp src/json_rpc_connection.cpp src/json_rpc_stream_connection.cpp + src/json_rpc_tcp_connection.cpp + src/json_rpc_tcp_server.cpp src/value.cpp src/lexical_cast.cpp src/spin_lock.cpp diff --git a/include/fc/function.hpp b/include/fc/function.hpp index 8444797..cb1c0d5 100644 --- a/include/fc/function.hpp +++ b/include/fc/function.hpp @@ -25,8 +25,7 @@ class function { function& operator=( const function& c ) { func = c.func; return *this; } function& operator=( function&& c ) { fc::swap(func,c.func); return *this; } - template - R operator()( Args2... args2)const { return func->call(fc::forward(args2)...); } + R operator()( Args... args)const { return func->call(args...); } bool operator!()const { return !func; } @@ -34,7 +33,7 @@ class function { struct impl_base : public fc::retainable { virtual ~impl_base(){} - virtual R call(Args...) = 0; + virtual R call(Args...)const = 0; }; template @@ -42,7 +41,7 @@ class function { template impl( U&& u ):func( fc::forward(u) ){} - virtual R call(Args... args) { return func(args...); } + virtual R call(Args... args)const { return func(args...); } Functor func; }; diff --git a/include/fc/json_rpc_client.hpp b/include/fc/json_rpc_client.hpp index cf3308e..ed3edb2 100644 --- a/include/fc/json_rpc_client.hpp +++ b/include/fc/json_rpc_client.hpp @@ -54,7 +54,7 @@ namespace fc { namespace json { private: void init() { this->_vtable.reset(new fc::detail::vtable() ); - this->_vtable->template visit( fc::json::detail::vtable_visitor(_con) ); + this->_vtable->template visit_other( fc::json::detail::vtable_visitor(_con) ); } rpc_connection::ptr _con; }; diff --git a/include/fc/json_rpc_connection.hpp b/include/fc/json_rpc_connection.hpp index 629b5e2..8cce426 100644 --- a/include/fc/json_rpc_connection.hpp +++ b/include/fc/json_rpc_connection.hpp @@ -52,10 +52,10 @@ namespace fc { namespace json { template struct add_method_visitor { public: - add_method_visitor( const fc::ptr& p, fc::json::rpc_connection& c ):_ptr(p){} + add_method_visitor( const fc::ptr& p, fc::json::rpc_connection& c ):_ptr(p),_con(c){} - template - void operator()( const char* name, fc::function& meth, Type ); + template + void operator()( const char* name, fc::function& meth); const fc::ptr& _ptr; fc::json::rpc_connection& _con; @@ -111,6 +111,8 @@ namespace fc { namespace json { private: void invoke( detail::pending_result::ptr&& p, const fc::string& m, value&& param ); void add_method( const fc::string& name, rpc_server_method::ptr&& m ); + template + friend struct detail::add_method_visitor; class impl; fc::shared_ptr my; @@ -119,8 +121,8 @@ namespace fc { namespace json { namespace detail { template - template - void add_method_visitor::operator()( const char* name, fc::function& meth, Type ) { + template + void add_method_visitor::operator()( const char* name, fc::function& meth) { _con.add_method( name, rpc_server_method::ptr( new rpc_server_method_impl(meth) ) ); } diff --git a/include/fc/json_rpc_stream_connection.hpp b/include/fc/json_rpc_stream_connection.hpp index 2ece567..0b1e8af 100644 --- a/include/fc/json_rpc_stream_connection.hpp +++ b/include/fc/json_rpc_stream_connection.hpp @@ -10,6 +10,8 @@ namespace fc { public: typedef fc::shared_ptr ptr; rpc_stream_connection( fc::istream&, fc::ostream& ); + rpc_stream_connection(const rpc_stream_connection& ); + rpc_stream_connection(); // the life of the streams must exceed the life of all copies // of this rpc_stream_connection @@ -17,7 +19,7 @@ namespace fc { // cancels all pending requests, closes the ostream // results on_close() being called if the stream is not already closed. - void close(); + virtual void close(); /** * When the connection is closed, call the given method @@ -25,9 +27,11 @@ namespace fc { void on_close( const fc::function& ); protected: + ~rpc_stream_connection(); virtual void send_invoke( uint64_t id, const fc::string& m, value&& param ); virtual void send_error( uint64_t id, int64_t code, const fc::string& msg ); virtual void send_result( uint64_t id, value&& r ); + private: class impl; fc::shared_ptr my; diff --git a/include/fc/json_rpc_tcp_connection.hpp b/include/fc/json_rpc_tcp_connection.hpp new file mode 100644 index 0000000..7f57256 --- /dev/null +++ b/include/fc/json_rpc_tcp_connection.hpp @@ -0,0 +1,28 @@ +#pragma once +#include + +namespace fc { + class tcp_socket; + namespace ip { class endpoint; } + + namespace json { + class rpc_tcp_connection : public rpc_stream_connection { + public: + typedef fc::shared_ptr ptr; + + rpc_tcp_connection(); + rpc_tcp_connection( const rpc_tcp_connection& c ); + ~rpc_tcp_connection(); + + void connect_to( const fc::ip::endpoint& e ); + void start(); + tcp_socket& get_socket()const; + + virtual void close(); + + private: + class impl; + fc::shared_ptr my; + }; + } // json +} // fc diff --git a/include/fc/json_rpc_tcp_server.hpp b/include/fc/json_rpc_tcp_server.hpp new file mode 100644 index 0000000..9e5e19d --- /dev/null +++ b/include/fc/json_rpc_tcp_server.hpp @@ -0,0 +1,42 @@ +#pragma once +#include + +namespace fc { + + namespace json { + class rpc_tcp_server { + private: + template + struct add_method_visitor { + add_method_visitor( fc::ptr& p, rpc_connection& s ):_ptr(p),_rpcc(s) { } + + template + void operator()( const char* name, Functor& fun ) { + _rpcc.add_method( name, fun ); + } + + fc::ptr& _ptr; + rpc_connection& _rpcc; + }; + + public: + rpc_tcp_server(); + ~rpc_tcp_server(); + + template + void add_interface( const fc::ptr& ptr ) { + on_new_connection( [=]( rpc_connection& c ) { + ptr->visit( detail::add_method_visitor( ptr, c ) ); + }); + } + + void on_new_connection( const fc::function& c ); + + void listen( uint16_t port ); + + private: + class impl; + impl* my; + }; + } +} diff --git a/include/fc/lexical_cast.hpp b/include/fc/lexical_cast.hpp index 18b9f6f..b72d8c7 100644 --- a/include/fc/lexical_cast.hpp +++ b/include/fc/lexical_cast.hpp @@ -71,6 +71,9 @@ namespace fc { template struct lexical_cast { static bool cast( const R& v ) { return v; } }; + template<> + struct lexical_cast { static bool cast( const fc::string& v ) { return v[0]; } };// TODO: check string len + template<> struct lexical_cast { static bool cast( const fc::string& v ) { return v == "true"; } }; diff --git a/include/fc/ptr.hpp b/include/fc/ptr.hpp index acc410d..3af839c 100644 --- a/include/fc/ptr.hpp +++ b/include/fc/ptr.hpp @@ -8,13 +8,13 @@ namespace fc { struct identity_member { // TODO: enumerate all method patterns template - static fc::function functor( P&& p, R (C::*mem_func)() ) { + static fc::function functor( P&& p, R (C::*mem_func)() ) { return [=](){ return (p->*mem_func)(); }; } template - static fc::function functor( P&& p, R (C::*mem_func)(A1) ) { - return [=](A1 a1){ return (p->*mem_func)(a1); }; + static fc::function functor( P&& p, R (C::*mem_func)(A1) ) { + return fc::function([=](A1 a1){ return (p->*mem_func)(a1); }); } }; @@ -44,24 +44,24 @@ namespace fc { template ptr( InterfaceType* p ) :_vtable( new vtable_type() ) { - _vtable->template visit( detail::vtable_visitor(p) ); + _vtable->template visit_other( detail::vtable_visitor(p) ); } template ptr( const fc::shared_ptr& p ) :_vtable( new vtable_type() ),_self(p){ - _vtable->template visit( detail::vtable_visitor(p.get()) ); + _vtable->template visit_other( detail::vtable_visitor(p.get()) ); } - vtable_type& operator*() { return *_vtable; } - const vtable_type& operator*()const { return *_vtable; } + //vtable_type& operator*() { return *_vtable; } + vtable_type& operator*()const { return *_vtable; } - vtable_type* operator->() { return _vtable.get(); } - const vtable_type* operator->()const { return _vtable.get(); } + //vtable_type* operator->() { return _vtable.get(); } + vtable_type* operator->()const { return _vtable.get(); } protected: - fc::shared_ptr< vtable_type > _vtable; - fc::shared_ptr< fc::retainable > _self; + fc::shared_ptr< vtable_type > _vtable; + fc::shared_ptr< fc::retainable > _self; }; @@ -72,8 +72,10 @@ namespace fc { #define FC_STUB_VTABLE_DEFINE_MEMBER( r, data, elem ) \ decltype(Transform::functor( (data*)nullptr, &data::elem)) elem; +#define FC_STUB_VTABLE_DEFINE_VISIT_OTHER( r, data, elem ) \ + v( BOOST_PP_STRINGIZE(elem), elem, &T::elem ); #define FC_STUB_VTABLE_DEFINE_VISIT( r, data, elem ) \ - v( BOOST_PP_STRINGIZE(elem), elem, &T::elem ); \ + v( BOOST_PP_STRINGIZE(elem), elem ); #define FC_STUB( CLASS, METHODS ) \ namespace fc { namespace detail { \ @@ -82,7 +84,11 @@ namespace fc { namespace detail { \ vtable(){} \ BOOST_PP_SEQ_FOR_EACH( FC_STUB_VTABLE_DEFINE_MEMBER, CLASS, METHODS ) \ template \ - void visit( Visitor&& v ) { \ + void visit_other( Visitor&& v ){ \ + BOOST_PP_SEQ_FOR_EACH( FC_STUB_VTABLE_DEFINE_VISIT_OTHER, CLASS, METHODS ) \ + } \ + template \ + void visit( Visitor&& v ){ \ BOOST_PP_SEQ_FOR_EACH( FC_STUB_VTABLE_DEFINE_VISIT, CLASS, METHODS ) \ } \ }; \ diff --git a/include/fc/tcp_socket.hpp b/include/fc/tcp_socket.hpp index dac3206..ca55fbd 100644 --- a/include/fc/tcp_socket.hpp +++ b/include/fc/tcp_socket.hpp @@ -2,24 +2,33 @@ #define _FC_TCP_SOCKET_HPP_ #include #include +#include namespace fc { namespace ip { class endpoint; } - class tcp_socket { + class tcp_socket : public iostream { public: tcp_socket(); ~tcp_socket(); - void connect_to( const fc::ip::endpoint& e ); + void connect_to( const fc::ip::endpoint& e ); - void write( const char* buffer, size_t len ); - size_t readsome( char* buffer, size_t max ); - size_t read( char* buffer, size_t s ); + /// istream interface + /// @{ + virtual size_t readsome( char* buffer, size_t max ); + virtual istream& read( char* buffer, size_t s ); + virtual bool eof()const; + /// @} + + /// ostream interface + /// @{ + virtual ostream& write( const char* buffer, size_t len ); + virtual void flush(); + virtual void close(); + /// @} bool is_open()const; - void flush(); - private: friend class tcp_server; class impl; @@ -28,16 +37,20 @@ namespace fc { class tcp_server { public: - tcp_server(uint16_t port=0); + tcp_server(); ~tcp_server(); void close(); bool accept( tcp_socket& s ); -// void listen( uint16_t port ); + void listen( uint16_t port ); private: + // non copyable + tcp_server( const tcp_server& ); + tcp_server& operator=(const tcp_server& s ); + class impl; - fc::fwd my; + impl* my; }; } // namesapce fc diff --git a/src/http_connection.cpp b/src/http_connection.cpp index f933625..c42650c 100644 --- a/src/http_connection.cpp +++ b/src/http_connection.cpp @@ -10,7 +10,7 @@ FC_START_SHARED_IMPL(fc::http::connection) int read_until( char* buffer, char* end, char c = '\n' ) { char* p = buffer; // try { - while( p < end && sock.read(p,1) ) { + while( p < end && !sock.read(p,1).eof() ) { if( *p == c ) { *p = '\0'; return (p - buffer)-1; diff --git a/src/json_rpc_connection.cpp b/src/json_rpc_connection.cpp index 94106c5..cf9ad7b 100644 --- a/src/json_rpc_connection.cpp +++ b/src/json_rpc_connection.cpp @@ -41,6 +41,9 @@ namespace fc { namespace json { }; + void rpc_connection::add_method( const fc::string& name, fc::shared_ptr&& p ) { + my->_methods[name] = fc::move(p); + } void rpc_connection::handle_message( const value& v ) { auto id_itr = v.find( "id" ); auto m_itr = v.find( "method" ); @@ -143,7 +146,7 @@ namespace fc { namespace json { my->_pr_tail = p; my->_pr_head = my->_pr_tail; } - send_invoke( ++my->_next_req_id, m, fc::move(param) ); + send_invoke( my->_next_req_id++, m, fc::move(param) ); } diff --git a/src/json_rpc_stream_connection.cpp b/src/json_rpc_stream_connection.cpp index 4fbfd8d..d88abb1 100644 --- a/src/json_rpc_stream_connection.cpp +++ b/src/json_rpc_stream_connection.cpp @@ -1,5 +1,6 @@ #include #include +#include #include namespace fc { namespace json { @@ -13,6 +14,7 @@ namespace fc { namespace json { impl( fc::istream& i, fc::ostream& o, rpc_stream_connection& s ) :in(i),out(o),self(s){ + slog( "%p", this ); _read_loop_complete = fc::async( [=](){ read_loop(); } ); } @@ -26,17 +28,20 @@ namespace fc { namespace json { fc::future _read_loop_complete; void read_loop() { + slog( "%p", this ); fc::string line; fc::getline( in, line ); while( !in.eof() ) { try { fc::value v= fc::json::from_string( line ); - + slog( "%s", fc::json::to_string(v).c_str() ); + self.handle_message(v); } catch (...) { wlog( "%s", fc::except_str().c_str() ); } fc::getline( in, line ); } + slog( "close read loop" ); self.cancel_pending_requests(); if( !!on_close ) on_close(); } @@ -45,17 +50,24 @@ namespace fc { namespace json { rpc_stream_connection::rpc_stream_connection( fc::istream& i, fc::ostream& o ) :my( new impl(i,o,*this) ){ } + rpc_stream_connection::rpc_stream_connection(){ slog( "default" ); } + rpc_stream_connection::rpc_stream_connection(const rpc_stream_connection& c):my(c.my){} + rpc_stream_connection::~rpc_stream_connection(){ + // slog( "%p", my.get() ); + } // the life of the streams must exceed the life of all copies // of this rpc_stream_connection void rpc_stream_connection::open( fc::istream& i, fc::ostream& o) { my.reset( new impl(i,o,*this) ); + slog( "open... %p", my.get() ); } // cancels all pending requests, closes the ostream // results on_close() being called if the stream is not already closed. void rpc_stream_connection::close() { - my->out.close(); + if( my ) my->out.close(); + my.reset(nullptr); } /** @@ -66,10 +78,22 @@ namespace fc { namespace json { } void rpc_stream_connection::send_invoke( uint64_t id, const fc::string& m, value&& param ) { + fc::stringstream ss; + ss<<"{\"id\":"<out.write( o.c_str(), o.size() ); } void rpc_stream_connection::send_error( uint64_t id, int64_t code, const fc::string& msg ) { + fc::stringstream ss; + ss<<"{\"id\":"<out.write( o.c_str(), o.size() ); } void rpc_stream_connection::send_result( uint64_t id, value&& r ) { + fc::stringstream ss; + ss<<"{\"id\":"<out.write( o.c_str(), o.size() ); } } } // fc::json diff --git a/src/json_rpc_tcp_connection.cpp b/src/json_rpc_tcp_connection.cpp new file mode 100644 index 0000000..845c6be --- /dev/null +++ b/src/json_rpc_tcp_connection.cpp @@ -0,0 +1,43 @@ +#include +#include + +namespace fc { + + namespace json { + class rpc_tcp_connection::impl : public fc::retainable { + public: + tcp_socket sock; + ~impl(){ slog( "%p", this );} + }; + + rpc_tcp_connection::rpc_tcp_connection() + :my( new impl() ){ + } + rpc_tcp_connection::rpc_tcp_connection( const rpc_tcp_connection& c ) + :rpc_stream_connection(c),my(c.my){} + + rpc_tcp_connection::~rpc_tcp_connection(){ + close(); + } + + void rpc_tcp_connection::connect_to( const fc::ip::endpoint& ep ) { + my->sock.connect_to(ep); + wlog( "Connected %p", my.get() ); + open( my->sock, my->sock ); + } + void rpc_tcp_connection::start() { + slog( "open... %p", my.get() ); + open( my->sock, my->sock ); + } + void rpc_tcp_connection::close() { + slog( "close %p", my.get() ); + rpc_stream_connection::close(); + //my->sock.close(); + } + + tcp_socket& rpc_tcp_connection::get_socket()const { return my->sock; } + + + } + +} diff --git a/src/json_rpc_tcp_server.cpp b/src/json_rpc_tcp_server.cpp new file mode 100644 index 0000000..3c64707 --- /dev/null +++ b/src/json_rpc_tcp_server.cpp @@ -0,0 +1,40 @@ +#include +#include +#include +#include + +namespace fc { + namespace json { + class rpc_tcp_server::impl { + public: + fc::function on_con; + fc::tcp_server tcp_serv; + fc::vector cons; + }; + rpc_tcp_server::rpc_tcp_server() + :my( new impl() ){} + rpc_tcp_server::~rpc_tcp_server() { + delete my; + } + + void rpc_tcp_server::on_new_connection( const fc::function& c ) { + my->on_con = c; + } + + void rpc_tcp_server::listen( uint16_t port ) { + my->tcp_serv.listen(port); + fc::async([this](){ + rpc_tcp_connection::ptr con(new rpc_tcp_connection() ); + while( my->tcp_serv.accept( con->get_socket() ) ) { + slog( "new connection!" ); + my->on_con( *con ); + con->start(); + my->cons.push_back(con); + con.reset(new rpc_tcp_connection() ); + } + }); + } + + + } +} diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index 8dec3c4..a9fcdd8 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -9,8 +9,9 @@ namespace fc { class tcp_socket::impl { public: - impl():_sock( fc::asio::default_io_service() ){} + impl():_sock( fc::asio::default_io_service() ){ slog( "sock %p", this); } ~impl(){ + slog( "~sock %p", this ); if( _sock.is_open() ) _sock.close(); } @@ -22,10 +23,18 @@ namespace fc { tcp_socket::tcp_socket(){} - tcp_socket::~tcp_socket(){} + tcp_socket::~tcp_socket(){ slog( "%p", &my); } + void tcp_socket::flush() {} + void tcp_socket::close() { + my->_sock.close(); + } - void tcp_socket::write( const char* buf, size_t len ) { + bool tcp_socket::eof()const { + return !my->_sock.is_open(); + } + + fc::ostream& tcp_socket::write( const char* buf, size_t len ) { boost::system::error_code ec; size_t w = my->_sock.write_some( boost::asio::buffer( buf, len ), ec ); @@ -46,6 +55,7 @@ namespace fc { wlog( "throw" ); throw boost::system::system_error(ec); } + return *this; } size_t tcp_socket::readsome( char* buf, size_t len ) { boost::system::error_code ec; @@ -63,12 +73,12 @@ namespace fc { } return w; } - size_t tcp_socket::read( char* buffer, size_t s ) { + fc::istream& tcp_socket::read( char* buffer, size_t s ) { size_t r = readsome( buffer, s ); while( r < s ) { r += readsome( buffer + r, s - r ); } - return r; + return *this; } void tcp_socket::connect_to( const fc::ip::endpoint& e ) { fc::asio::tcp::connect(my->_sock, fc::asio::tcp::endpoint( boost::asio::ip::address_v4(e.get_address()), e.port() ) ); @@ -84,12 +94,14 @@ namespace fc { boost::asio::ip::tcp::acceptor _accept; }; void tcp_server::close() { - if( my->_accept.is_open() ) my->_accept.close(); + if( my && my->_accept.is_open() ) my->_accept.close(); + delete my; my = nullptr; } - tcp_server::tcp_server(uint16_t port) - :my(port) { + tcp_server::tcp_server() + :my(nullptr) { } tcp_server::~tcp_server() { + delete my; } @@ -103,16 +115,11 @@ namespace fc { if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) ); return true; } - #if 0 void tcp_server::listen( uint16_t port ) { - /* - slog( "listen %d!", port ); - my->_accept.bind( - slog( "listen %d!", port ); - my->_accept.listen(port); - slog( "listen %d!", port ); - */ + if( my ) delete my; + my = new impl(port); } - #endif + + } // namespace fc diff --git a/tests/json_rpc_test.cpp b/tests/json_rpc_test.cpp index 911abb0..72a4fff 100644 --- a/tests/json_rpc_test.cpp +++ b/tests/json_rpc_test.cpp @@ -1,5 +1,8 @@ #include #include +#include +#include +#include #include struct test { @@ -20,11 +23,29 @@ FC_STUB( test, (add)(sub)(sub1)(sub2)(sub3)(sub4)(sub5)(sub6)(sub7)(sub8)(sub9) int main( int argc, char** argv ) { try { + fc::ptr t( new test() ); + fc::json::rpc_tcp_server serv; + serv.add_interface( t ); + serv.listen(8001); + slog("..."); + { + wlog( "create new connection" ); + fc::json::rpc_tcp_connection::ptr con(new fc::json::rpc_tcp_connection()); + wlog( "connnect to..." ); + con->connect_to( fc::ip::endpoint::from_string("127.0.0.1:8001") ); + wlog( "connected, " ); + + fc::json::rpc_client rpcc( con ); + slog( "5+1=%d", rpcc->add(5).wait() ); + } + slog( "exit serv" ); + /* fc::json::rpc_connection::ptr con( new fc::json::rpc_stream_connection( fc::cin, fc::cout ) ); fc::json::rpc_client c( con ); slog( "%d", c->add( 5 ).wait() ); slog( "%d", c->add( 6 ).wait() ); + */ slog( "Exiting" ); } catch ( ... ) {