From e1e3a7361bb914e06d7dc6d67a6629b6727c8629 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Sun, 24 Nov 2013 22:23:29 -0500 Subject: [PATCH] added flexability to json parsing and fixed closing hang in json rpc connection --- src/asio.cpp | 4 ++-- src/io/json.cpp | 46 +++++++++++++++++++++++++++++++++++-- src/rpc/json_connection.cpp | 24 ++++++++----------- 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/asio.cpp b/src/asio.cpp index 57b7cca..6bcebd8 100644 --- a/src/asio.cpp +++ b/src/asio.cpp @@ -50,7 +50,7 @@ namespace fc { } else { - // elog( "${message} ", ("message", boost::system::system_error(ec).what())); + elog( "${message} ", ("message", boost::system::system_error(ec).what())); p->set_exception( fc::exception_ptr( new fc::exception( FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); } @@ -106,7 +106,7 @@ namespace fc { { delete the_work; io->stop(); - // TODO: this hangs sometimes.. asio_thread->join(); + asio_thread->join(); delete io; delete asio_thread; } diff --git a/src/io/json.cpp b/src/io/json.cpp index c8d96ba..a918bf6 100644 --- a/src/io/json.cpp +++ b/src/io/json.cpp @@ -94,6 +94,45 @@ namespace fc } FC_RETHROW_EXCEPTIONS( warn, "while parsing token '${token}'", ("token", token.str() ) ); } + template + fc::string stringFromToken( T& in ) + { + fc::stringstream token; + try + { + char c = in.peek(); + + // if( c != ' ' ) + // FC_THROW_EXCEPTION( parse_error_exception, + // "Expected '\"' but read '${char}'", + // ("char", string(&c, (&c) + 1) ) ); + // in.get(); + while( true ) + { + switch( c = in.peek() ) + { + case '\\': + token << parseEscape( in ); + break; + case '\t': + case ' ': + in.get(); + return token.str(); + default: + token << c; + in.get(); + } + } + // FC_THROW_EXCEPTION( parse_error_exception, "EOF before closing '\"' in string '${token}'", + // ("token", token.str() ) ); + } + catch( const fc::eof_exception& eof ) + { + return token.str(); + } + FC_RETHROW_EXCEPTIONS( warn, "while parsing token '${token}'", + ("token", token.str() ) ); + } template variant_object objectFromStream( T& in ) @@ -246,8 +285,9 @@ namespace fc if( str == "false" ) return false; else { - FC_THROW_EXCEPTION( parse_error_exception, "Invalid token '${token}'", - ("token",str) ); + return str; + // FC_THROW_EXCEPTION( parse_error_exception, "Invalid token '${token}'", + // ("token",str) ); } } } @@ -298,6 +338,8 @@ namespace fc case 0x04: // ^D end of transmission FC_THROW_EXCEPTION( eof_exception, "unexpected end of file" ); default: + // ilog( "unhandled char '${c}' int ${int}", ("c", fc::string( &c, 1 ) )("int", int(c)) ); + return stringFromToken(in); in.get(); // ilog( "unhandled char '${c}' int ${int}", ("c", fc::string( &c, 1 ) )("int", int(c)) ); return variant(); diff --git a/src/rpc/json_connection.cpp b/src/rpc/json_connection.cpp index 5c88752..73feba7 100644 --- a/src/rpc/json_connection.cpp +++ b/src/rpc/json_connection.cpp @@ -35,6 +35,7 @@ namespace fc { namespace rpc { void send_result( variant id, variant result ) { + ilog( "send ${i} ${r}", ("i",id)("r",result) ); { fc::scoped_lock lock(_write_mutex); *_out << "{\"id\":"; @@ -57,13 +58,13 @@ namespace fc { namespace rpc { json::to_stream( *_out, variant(e)); *_out << "}}\n"; } - wlog( "exception: ${except}", ("except", variant(e)) ); + //wlog( "exception: ${except}", ("except", variant(e)) ); _out->flush(); } void handle_message( const variant_object& obj ) { - wlog( "recv: ${msg}", ("msg", obj) ); + // wlog( "recv: ${msg}", ("msg", obj) ); try { auto m = obj.find("method"); @@ -160,7 +161,7 @@ namespace fc { namespace rpc { auto data = err.find( "data" ); if( data != err.end() ) { - wlog( "exception: ${except}", ("except", data->value() ) ); + //wlog( "exception: ${except}", ("except", data->value() ) ); await->second->set_exception( data->value().as().dynamic_copy_exception() ); } else @@ -196,31 +197,27 @@ namespace fc { namespace rpc { try { fc::string line; - while( true ) + while( !_done.canceled() ) { variant v = json::from_stream(*_in); ///ilog( "input: ${in}", ("in", v ) ); - wlog( "recv: ${line}", ("line", line) ); + //wlog( "recv: ${line}", ("line", line) ); fc::async([=](){ handle_message(v.get_object()); }); } } catch ( eof_exception& eof ) { _eof = true; - wlog( "close" ); close( eof.dynamic_copy_exception() ); } catch ( exception& e ) { - wlog( "close" ); close( e.dynamic_copy_exception() ); } catch ( ... ) { - wlog( "close" ); close( fc::exception_ptr(new FC_EXCEPTION( unhandled_exception, "json connection read error" )) ); } - wlog( "close" ); } void close( fc::exception_ptr e ) @@ -245,6 +242,7 @@ namespace fc { namespace rpc { if( my->_done.valid() && !my->_done.ready() ) { my->_done.cancel(); + my->_out->close(); my->_done.wait(); } } @@ -263,17 +261,17 @@ namespace fc { namespace rpc { { FC_THROW_EXCEPTION( assert_exception, "start should only be called once" ); } - - wlog( "EXEC!!!\n" ); return my->_done = fc::async( [=](){ my->read_loop(); } ); } void json_connection::add_method( const fc::string& name, method m ) { + ilog( "add method ${name}", ("name",name) ); my->_methods.emplace(std::pair(name,fc::move(m))); } void json_connection::add_named_param_method( const fc::string& name, named_param_method m ) { + ilog( "add named param method ${name}", ("name",name) ); my->_named_param_methods.emplace(std::pair(name,fc::move(m))); } void json_connection::remove_method( const fc::string& name ) @@ -349,7 +347,6 @@ namespace fc { namespace rpc { future json_connection::async_call( const fc::string& method, const variant& a1 ) { - ilog( ""); auto id = my->_next_id++; my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); @@ -368,7 +365,6 @@ namespace fc { namespace rpc { } future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2 ) { - ilog( ""); auto id = my->_next_id++; my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); @@ -389,7 +385,6 @@ namespace fc { namespace rpc { } future json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3 ) { - ilog( ""); auto id = my->_next_id++; my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); @@ -419,7 +414,6 @@ namespace fc { namespace rpc { } future json_connection::async_call( const fc::string& method, const variant_object& named_args ) { - wlog( "${method} ${args}", ("method",method)("args",named_args) ); auto id = my->_next_id++; my->_awaiting[id] = fc::promise::ptr( new fc::promise() ); fc::scoped_lock lock(my->_write_mutex);