added flexability to json parsing and fixed closing hang in json rpc connection

This commit is contained in:
Daniel Larimer 2013-11-24 22:23:29 -05:00
parent 76b13a741a
commit e1e3a7361b
3 changed files with 55 additions and 19 deletions

View file

@ -50,7 +50,7 @@ namespace fc {
} }
else else
{ {
// elog( "${message} ", ("message", boost::system::system_error(ec).what())); elog( "${message} ", ("message", boost::system::system_error(ec).what()));
p->set_exception( fc::exception_ptr( new fc::exception( p->set_exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) ); FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
} }
@ -106,7 +106,7 @@ namespace fc {
{ {
delete the_work; delete the_work;
io->stop(); io->stop();
// TODO: this hangs sometimes.. asio_thread->join(); asio_thread->join();
delete io; delete io;
delete asio_thread; delete asio_thread;
} }

View file

@ -94,6 +94,45 @@ namespace fc
} FC_RETHROW_EXCEPTIONS( warn, "while parsing token '${token}'", } FC_RETHROW_EXCEPTIONS( warn, "while parsing token '${token}'",
("token", token.str() ) ); ("token", token.str() ) );
} }
template<typename T>
fc::string stringFromToken( T& in )
{
fc::stringstream token;
try
{
char c = in.peek();
// if( c != ' ' )
// FC_THROW_EXCEPTION( parse_error_exception,
// "Expected '\"' but read '${char}'",
// ("char", string(&c, (&c) + 1) ) );
// in.get();
while( true )
{
switch( c = in.peek() )
{
case '\\':
token << parseEscape( in );
break;
case '\t':
case ' ':
in.get();
return token.str();
default:
token << c;
in.get();
}
}
// FC_THROW_EXCEPTION( parse_error_exception, "EOF before closing '\"' in string '${token}'",
// ("token", token.str() ) );
}
catch( const fc::eof_exception& eof )
{
return token.str();
}
FC_RETHROW_EXCEPTIONS( warn, "while parsing token '${token}'",
("token", token.str() ) );
}
template<typename T> template<typename T>
variant_object objectFromStream( T& in ) variant_object objectFromStream( T& in )
@ -246,8 +285,9 @@ namespace fc
if( str == "false" ) return false; if( str == "false" ) return false;
else else
{ {
FC_THROW_EXCEPTION( parse_error_exception, "Invalid token '${token}'", return str;
("token",str) ); // FC_THROW_EXCEPTION( parse_error_exception, "Invalid token '${token}'",
// ("token",str) );
} }
} }
} }
@ -298,6 +338,8 @@ namespace fc
case 0x04: // ^D end of transmission case 0x04: // ^D end of transmission
FC_THROW_EXCEPTION( eof_exception, "unexpected end of file" ); FC_THROW_EXCEPTION( eof_exception, "unexpected end of file" );
default: default:
// ilog( "unhandled char '${c}' int ${int}", ("c", fc::string( &c, 1 ) )("int", int(c)) );
return stringFromToken(in);
in.get(); // in.get(); //
ilog( "unhandled char '${c}' int ${int}", ("c", fc::string( &c, 1 ) )("int", int(c)) ); ilog( "unhandled char '${c}' int ${int}", ("c", fc::string( &c, 1 ) )("int", int(c)) );
return variant(); return variant();

View file

@ -35,6 +35,7 @@ namespace fc { namespace rpc {
void send_result( variant id, variant result ) void send_result( variant id, variant result )
{ {
ilog( "send ${i} ${r}", ("i",id)("r",result) );
{ {
fc::scoped_lock<fc::mutex> lock(_write_mutex); fc::scoped_lock<fc::mutex> lock(_write_mutex);
*_out << "{\"id\":"; *_out << "{\"id\":";
@ -57,13 +58,13 @@ namespace fc { namespace rpc {
json::to_stream( *_out, variant(e)); json::to_stream( *_out, variant(e));
*_out << "}}\n"; *_out << "}}\n";
} }
wlog( "exception: ${except}", ("except", variant(e)) ); //wlog( "exception: ${except}", ("except", variant(e)) );
_out->flush(); _out->flush();
} }
void handle_message( const variant_object& obj ) void handle_message( const variant_object& obj )
{ {
wlog( "recv: ${msg}", ("msg", obj) ); // wlog( "recv: ${msg}", ("msg", obj) );
try try
{ {
auto m = obj.find("method"); auto m = obj.find("method");
@ -160,7 +161,7 @@ namespace fc { namespace rpc {
auto data = err.find( "data" ); auto data = err.find( "data" );
if( data != err.end() ) if( data != err.end() )
{ {
wlog( "exception: ${except}", ("except", data->value() ) ); //wlog( "exception: ${except}", ("except", data->value() ) );
await->second->set_exception( data->value().as<exception>().dynamic_copy_exception() ); await->second->set_exception( data->value().as<exception>().dynamic_copy_exception() );
} }
else else
@ -196,31 +197,27 @@ namespace fc { namespace rpc {
try try
{ {
fc::string line; fc::string line;
while( true ) while( !_done.canceled() )
{ {
variant v = json::from_stream(*_in); variant v = json::from_stream(*_in);
///ilog( "input: ${in}", ("in", v ) ); ///ilog( "input: ${in}", ("in", v ) );
wlog( "recv: ${line}", ("line", line) ); //wlog( "recv: ${line}", ("line", line) );
fc::async([=](){ handle_message(v.get_object()); }); fc::async([=](){ handle_message(v.get_object()); });
} }
} }
catch ( eof_exception& eof ) catch ( eof_exception& eof )
{ {
_eof = true; _eof = true;
wlog( "close" );
close( eof.dynamic_copy_exception() ); close( eof.dynamic_copy_exception() );
} }
catch ( exception& e ) catch ( exception& e )
{ {
wlog( "close" );
close( e.dynamic_copy_exception() ); close( e.dynamic_copy_exception() );
} }
catch ( ... ) catch ( ... )
{ {
wlog( "close" );
close( fc::exception_ptr(new FC_EXCEPTION( unhandled_exception, "json connection read error" )) ); close( fc::exception_ptr(new FC_EXCEPTION( unhandled_exception, "json connection read error" )) );
} }
wlog( "close" );
} }
void close( fc::exception_ptr e ) void close( fc::exception_ptr e )
@ -245,6 +242,7 @@ namespace fc { namespace rpc {
if( my->_done.valid() && !my->_done.ready() ) if( my->_done.valid() && !my->_done.ready() )
{ {
my->_done.cancel(); my->_done.cancel();
my->_out->close();
my->_done.wait(); my->_done.wait();
} }
} }
@ -263,17 +261,17 @@ namespace fc { namespace rpc {
{ {
FC_THROW_EXCEPTION( assert_exception, "start should only be called once" ); FC_THROW_EXCEPTION( assert_exception, "start should only be called once" );
} }
wlog( "EXEC!!!\n" );
return my->_done = fc::async( [=](){ my->read_loop(); } ); return my->_done = fc::async( [=](){ my->read_loop(); } );
} }
void json_connection::add_method( const fc::string& name, method m ) void json_connection::add_method( const fc::string& name, method m )
{ {
ilog( "add method ${name}", ("name",name) );
my->_methods.emplace(std::pair<std::string,method>(name,fc::move(m))); my->_methods.emplace(std::pair<std::string,method>(name,fc::move(m)));
} }
void json_connection::add_named_param_method( const fc::string& name, named_param_method m ) void json_connection::add_named_param_method( const fc::string& name, named_param_method m )
{ {
ilog( "add named param method ${name}", ("name",name) );
my->_named_param_methods.emplace(std::pair<std::string,named_param_method>(name,fc::move(m))); my->_named_param_methods.emplace(std::pair<std::string,named_param_method>(name,fc::move(m)));
} }
void json_connection::remove_method( const fc::string& name ) void json_connection::remove_method( const fc::string& name )
@ -349,7 +347,6 @@ namespace fc { namespace rpc {
future<variant> json_connection::async_call( const fc::string& method, const variant& a1 ) future<variant> json_connection::async_call( const fc::string& method, const variant& a1 )
{ {
ilog( "");
auto id = my->_next_id++; auto id = my->_next_id++;
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() ); my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
@ -368,7 +365,6 @@ namespace fc { namespace rpc {
} }
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2 ) future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2 )
{ {
ilog( "");
auto id = my->_next_id++; auto id = my->_next_id++;
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() ); my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
@ -389,7 +385,6 @@ namespace fc { namespace rpc {
} }
future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3 ) future<variant> json_connection::async_call( const fc::string& method, const variant& a1, const variant& a2, const variant& a3 )
{ {
ilog( "");
auto id = my->_next_id++; auto id = my->_next_id++;
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() ); my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
@ -419,7 +414,6 @@ namespace fc { namespace rpc {
} }
future<variant> json_connection::async_call( const fc::string& method, const variant_object& named_args ) future<variant> json_connection::async_call( const fc::string& method, const variant_object& named_args )
{ {
wlog( "${method} ${args}", ("method",method)("args",named_args) );
auto id = my->_next_id++; auto id = my->_next_id++;
my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() ); my->_awaiting[id] = fc::promise<variant>::ptr( new fc::promise<variant>() );
fc::scoped_lock<fc::mutex> lock(my->_write_mutex); fc::scoped_lock<fc::mutex> lock(my->_write_mutex);