diff --git a/CMakeLists.txt b/CMakeLists.txt index e3c6157..e9437db 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,12 +41,14 @@ option( UNITY_BUILD OFF ) include_directories( vendor/boost/process/include ) include_directories( ${Boost_INCLUDE_DIR} ) +include_directories( ${OPENSSL_INCLUDE_DIR} ) include_directories( include ) set( sources src/process.cpp src/http_connection.cpp src/json_rpc_connection.cpp + src/json_rpc_stream_connection.cpp src/value.cpp src/lexical_cast.cpp src/spin_lock.cpp @@ -82,7 +84,7 @@ set( sources ) setup_library( fc SOURCES ${sources} ) -setup_executable( json_rpc_test SOURCES tests/json_rpc_test.cpp LIBRARIES fc ${Boost_THREAD_LIBRARY} ${Boost_CONTEXT_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ) +setup_executable( json_rpc_test SOURCES tests/json_rpc_test.cpp LIBRARIES fc ${pthread_library} ${rt_library} ${Boost_THREAD_LIBRARY} ${Boost_CONTEXT_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${rt_library} ) #add_executable( test_vec tests/vector_test.cpp ) #target_link_libraries( test_vec fc ${Boost_SYSTEM_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_THREAD_LIBRARY} ${Boost_CONTEXT_LIBRARY} ) diff --git a/include/fc/function.hpp b/include/fc/function.hpp index fdead59..8444797 100644 --- a/include/fc/function.hpp +++ b/include/fc/function.hpp @@ -28,6 +28,8 @@ class function { template R operator()( Args2... args2)const { return func->call(fc::forward(args2)...); } + bool operator!()const { return !func; } + protected: struct impl_base : public fc::retainable { diff --git a/include/fc/iostream.hpp b/include/fc/iostream.hpp index 06a5809..0e3829a 100644 --- a/include/fc/iostream.hpp +++ b/include/fc/iostream.hpp @@ -76,12 +76,4 @@ namespace fc { v = fc::lexical_cast(str); return o; } - - template - cin_t& operator>>( cin_t& o, T& v ) { - fc::string str; - getline( o, str, ' ' ); - v = fc::lexical_cast(str); - return o; - } } diff --git a/include/fc/json_rpc_connection.hpp b/include/fc/json_rpc_connection.hpp index 36e8f0b..629b5e2 100644 --- a/include/fc/json_rpc_connection.hpp +++ b/include/fc/json_rpc_connection.hpp @@ -2,8 +2,16 @@ #include #include #include +#include namespace fc { namespace json { + class rpc_connection; + + struct rpc_server_method : public fc::retainable { + typedef fc::shared_ptr ptr; + virtual value call( const value& v ) = 0; + }; + namespace detail { struct pending_result : virtual public promise_base { typedef shared_ptr ptr; @@ -31,20 +39,28 @@ namespace fc { namespace json { ~pending_result_impl(){} }; - struct server_method : public fc::retainable { - typedef fc::shared_ptr ptr; - virtual value call( const value& param ) = 0; - }; + template - struct server_method_impl : public server_method { - server_method_impl( const fc::function& a ):func(a){}; - virtual value call( const value& param ) { - return value( func( value_cast(param) ) ); - } - fc::function func; + struct rpc_server_method_impl : public rpc_server_method { + rpc_server_method_impl( const fc::function& f ):func(f){} + virtual value call( const value& v ) { + return value( func( fc::value_cast( v ) ) ); + } + fc::function func; }; - } // namespace detail + template + struct add_method_visitor { + public: + add_method_visitor( const fc::ptr& p, fc::json::rpc_connection& c ):_ptr(p){} + + template + void operator()( const char* name, fc::function& meth, Type ); + + const fc::ptr& _ptr; + fc::json::rpc_connection& _con; + }; + } /** * This is the base JSON RPC connection that handles the protocol @@ -75,22 +91,40 @@ namespace fc { namespace json { template void add_method( const fc::string& name, const fc::function& a ) { - this->add_method( name, detail::server_method::ptr(new detail::server_method_impl(a) ) ); + this->add_method( name, rpc_server_method::ptr(new detail::rpc_server_method_impl(a) ) ); } + template + void add_interface( const fc::ptr& it ) { + it->template visit( detail::add_method_visitor( it, *this ) ); + } + + void add_method( const fc::string& name, const fc::json::rpc_server_method::ptr& func ); + protected: void handle_message( const value& m ); - virtual void send_invoke( uint64_t id, const fc::string& m, value&& param ); - virtual void send_error( uint64_t id, int64_t code, const fc::string& msg ); - virtual void send_result( uint64_t id, value&& r ); + virtual void send_invoke( uint64_t id, const fc::string& m, value&& param ) = 0; + virtual void send_error( uint64_t id, int64_t code, const fc::string& msg ) = 0; + virtual void send_result( uint64_t id, value&& r ) = 0; + private: void invoke( detail::pending_result::ptr&& p, const fc::string& m, value&& param ); - void add_method( const fc::string& name, detail::server_method::ptr&& m ); + void add_method( const fc::string& name, rpc_server_method::ptr&& m ); class impl; fc::shared_ptr my; }; + namespace detail { + + template + template + void add_method_visitor::operator()( const char* name, fc::function& meth, Type ) { + _con.add_method( name, rpc_server_method::ptr( new rpc_server_method_impl(meth) ) ); + } + + } // namespace detail + } } // fc::json diff --git a/include/fc/reflect.hpp b/include/fc/reflect.hpp index 469ca28..219351a 100644 --- a/include/fc/reflect.hpp +++ b/include/fc/reflect.hpp @@ -9,20 +9,12 @@ #define _FC_REFLECT_HPP_ #include -#include -//#include -#include #include #include #include -//#include -//#include #include -//#include -//#include - namespace fc { /** diff --git a/include/fc/sha1.hpp b/include/fc/sha1.hpp index 34ddb7a..cef850f 100644 --- a/include/fc/sha1.hpp +++ b/include/fc/sha1.hpp @@ -34,7 +34,7 @@ namespace fc { private: struct impl; - fwd my; + fwd my; }; template diff --git a/src/json_rpc_connection.cpp b/src/json_rpc_connection.cpp index bab15c5..94106c5 100644 --- a/src/json_rpc_connection.cpp +++ b/src/json_rpc_connection.cpp @@ -23,8 +23,8 @@ namespace fc { namespace json { public: impl():_next_req_id(0){ } ~impl(){ cancel_pending_requests(); } - int64_t _next_req_id; - std::unordered_map _methods; + int64_t _next_req_id; + std::unordered_map _methods; detail::pending_result::ptr _pr_head; detail::pending_result::ptr _pr_tail; diff --git a/src/json_rpc_stream_connection.cpp b/src/json_rpc_stream_connection.cpp index e8d58de..4fbfd8d 100644 --- a/src/json_rpc_stream_connection.cpp +++ b/src/json_rpc_stream_connection.cpp @@ -1,45 +1,78 @@ +#include +#include +#include - /** note the life of i and o must be longer than rpc_connection's life */ - rpc_connection( istream& i, ostream& o ); +namespace fc { namespace json { - /** note the life of i and o must be longer than rpc_connection's life */ - void init( istream& i, ostream& o ); + class rpc_stream_connection::impl : public fc::retainable { + public: + fc::istream& in; + fc::ostream& out; + rpc_stream_connection& self; + fc::function on_close; - istream* _in; - ostream* _out; + impl( fc::istream& i, fc::ostream& o, rpc_stream_connection& s ) + :in(i),out(o),self(s){ + _read_loop_complete = fc::async( [=](){ read_loop(); } ); + } + + ~impl() { + try { + self.cancel_pending_requests(); + _read_loop_complete.cancel(); + _read_loop_complete.wait(); + } catch ( ... ) {} + } fc::future _read_loop_complete; void read_loop() { fc::string line; - fc::getline( *_in, line ); - while( !_in->eof() ) { + fc::getline( in, line ); + while( !in.eof() ) { try { fc::value v= fc::json::from_string( line ); } catch (...) { wlog( "%s", fc::except_str().c_str() ); } - fc::getline( *_in, line ); + fc::getline( in, line ); } - slog( "Exit Read Loop, canceling waiting tasks!" ); - - auto cur = _pr_head; - while( cur ) { - cur->handle_error( "Connection Closed" ); - cur = cur->next; - } - - _pr_head.reset(); - _pr_tail.reset(); + self.cancel_pending_requests(); + if( !!on_close ) on_close(); } + }; - rpc_connection::rpc_connection( istream& i, ostream& o ) - :my( new impl() ) - { - init( i, o ); + rpc_stream_connection::rpc_stream_connection( fc::istream& i, fc::ostream& o ) + :my( new impl(i,o,*this) ){ } - void rpc_connection::init( istream& i, ostream& o ) { - my->_in = &i; - my->_out = &o; - my->_read_loop_complete = fc::async( [=](){ my->read_loop(); } ); + + // the life of the streams must exceed the life of all copies + // of this rpc_stream_connection + void rpc_stream_connection::open( fc::istream& i, fc::ostream& o) { + my.reset( new impl(i,o,*this) ); } + + // cancels all pending requests, closes the ostream + // results on_close() being called if the stream is not already closed. + void rpc_stream_connection::close() { + my->out.close(); + } + + /** + * When the connection is closed, call the given method + */ + void rpc_stream_connection::on_close( const fc::function& oc ) { + my->on_close = oc; + } + + void rpc_stream_connection::send_invoke( uint64_t id, const fc::string& m, value&& param ) { + } + void rpc_stream_connection::send_error( uint64_t id, int64_t code, const fc::string& msg ) { + } + void rpc_stream_connection::send_result( uint64_t id, value&& r ) { + } + +} } // fc::json + + + diff --git a/src/value.cpp b/src/value.cpp index a3c0f2b..a1c8a18 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -270,6 +270,10 @@ value& value::operator=( const value& v ){ gh(v.holder)->copy_helper(holder); return *this; } +bool value::is_null()const { + return strcmp(gh(holder)->type(), "null") == 0; +} + value::object::const_iterator value::find( const char* key )const { if( strcmp(gh(holder)->type(), "object") == 0) {