rpc: Move many method implementations from headers to cpp files

This commit is contained in:
theoreticalbts 2015-06-30 15:28:12 -04:00
parent f461dee432
commit 7bd47af88e
7 changed files with 379 additions and 266 deletions

View file

@ -146,7 +146,6 @@ set( CMAKE_FIND_LIBRARY_SUFFIXES ${ORIGINAL_LIB_SUFFIXES} )
option( UNITY_BUILD OFF )
set( fc_sources
src/rpc/state.cpp
src/uint128.cpp
src/real128.cpp
src/variant.cpp
@ -178,8 +177,11 @@ set( fc_sources
src/interprocess/signals.cpp
src/interprocess/file_mapping.cpp
src/interprocess/mmap_struct.cpp
src/rpc/json_connection.cpp
src/rpc/cli.cpp
src/rpc/http_api.cpp
src/rpc/json_connection.cpp
src/rpc/state.cpp
src/rpc/websocket_api.cpp
src/log/log_message.cpp
src/log/logger.cpp
src/log/appender.cpp

View file

@ -16,83 +16,25 @@ namespace fc { namespace rpc {
class cli : public api_connection
{
public:
~cli()
{
if( _run_complete.valid() )
{
stop();
}
}
virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() )
{
FC_ASSERT(false);
}
virtual variant send_callback( uint64_t callback_id, variants args = variants() )
{
FC_ASSERT(false);
}
virtual void send_notice( uint64_t callback_id, variants args = variants() )
{
FC_ASSERT(false);
}
~cli();
void start()
{
_run_complete = fc::async( [&](){ run(); } );
}
void stop()
{
_run_complete.cancel();
_run_complete.wait();
}
void wait(){ _run_complete.wait(); }
void format_result( const string& method, std::function<string(variant,const variants&)> formatter)
{
_result_formatters[method] = formatter;
}
virtual variant send_call( api_id_type api_id, string method_name, variants args = variants() );
virtual variant send_callback( uint64_t callback_id, variants args = variants() );
virtual void send_notice( uint64_t callback_id, variants args = variants() );
void start();
void stop();
void wait();
void format_result( const string& method, std::function<string(variant,const variants&)> formatter);
virtual void getline( const fc::string& prompt, fc::string& line );
void set_prompt( const string& prompt ) { _prompt = prompt; }
void set_prompt( const string& prompt );
private:
void run()
{
while( !_run_complete.canceled() )
{
try {
std::string line;
try
{
getline( _prompt.c_str(), line );
}
catch ( const fc::eof_exception& e )
{
break;
}
std::cout << line << "\n";
line += char(EOF);
fc::variants args = fc::json::variants_from_string(line);;
if( args.size() == 0 ) continue;
void run();
const string& method = args[0].get_string();
auto result = receive_call( 0, method, variants( args.begin()+1,args.end() ) );
auto itr = _result_formatters.find( method );
if( itr == _result_formatters.end() )
{
std::cout << fc::json::to_pretty_string( result ) << "\n";
}
else
std::cout << itr->second( result, args ) << "\n";
}
catch ( const fc::exception& e )
{
std::cout << e.to_detail_string() << "\n";
}
}
}
std::string _prompt = ">>>";
std::string _prompt = ">>>";
std::map<string,std::function<string(variant,const variants&)> > _result_formatters;
fc::future<void> _run_complete;
};

View file

@ -11,108 +11,24 @@ namespace fc { namespace rpc {
class http_api_connection : public api_connection
{
public:
~http_api_connection()
{
}
http_api_connection();
~http_api_connection();
http_api_connection()
{
_rpc_state.add_method( "call", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 3 && args[2].is_array() );
return this->receive_call( args[0].as_uint64(),
args[1].as_string(),
args[2].get_array() );
});
virtual variant send_call(
api_id_type api_id,
string method_name,
variants args = variants() ) override;
virtual variant send_callback(
uint64_t callback_id,
variants args = variants() ) override;
virtual void send_notice(
uint64_t callback_id,
variants args = variants() ) override;
_rpc_state.add_method( "notice", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_notice( args[0].as_uint64(), args[1].get_array() );
return variant();
});
void on_request(
const fc::http::request& req,
const fc::http::server::response& resp );
_rpc_state.add_method( "callback", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_callback( args[0].as_uint64(), args[1].get_array() );
return variant();
});
_rpc_state.on_unhandled( [&]( const std::string& method_name, const variants& args ){
return this->receive_call( 0, method_name, args );
});
}
virtual variant send_call( api_id_type api_id,
string method_name,
variants args = variants() ) override
{
// HTTP has no way to do this, so do nothing
return variant();
}
virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override
{
// HTTP has no way to do this, so do nothing
return variant();
}
virtual void send_notice( uint64_t callback_id, variants args = variants() ) override
{
// HTTP has no way to do this, so do nothing
return;
}
void on_request( const fc::http::request& req, const fc::http::server::response& resp )
{
// this must be called by outside HTTP server's on_request method
std::string resp_body;
http::reply::status_code resp_status;
try
{
resp.add_header( "Content-Type", "application/json" );
std::string req_body( req.body.begin(), req.body.end() );
auto var = fc::json::from_string( req_body );
const auto& var_obj = var.get_object();
if( var_obj.contains( "method" ) )
{
auto call = var.as<fc::rpc::request>();
try
{
auto result = _rpc_state.local_call( call.method, call.params );
resp_body = fc::json::to_string( fc::rpc::response( *call.id, result ) );
resp_status = http::reply::OK;
}
catch ( const fc::exception& e )
{
resp_body = fc::json::to_string( fc::rpc::response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) );
resp_status = http::reply::InternalServerError;
}
}
else
{
resp_status = http::reply::BadRequest;
resp_body = "";
}
}
catch ( const fc::exception& e )
{
resp_status = http::reply::InternalServerError;
resp_body = "";
wdump((e.to_detail_string()));
}
try
{
resp.set_status( resp_status );
resp.set_length( resp_body.length() );
resp.write( resp_body.c_str(), resp_body.length() );
}
catch( const fc::exception& e )
{
wdump((e.to_detail_string()));
}
return;
}
fc::rpc::state _rpc_state;
};

View file

@ -10,104 +10,25 @@ namespace fc { namespace rpc {
class websocket_api_connection : public api_connection
{
public:
~websocket_api_connection()
{
}
websocket_api_connection( fc::http::websocket_connection& c )
:_connection(c)
{
_rpc_state.add_method( "call", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 3 && args[2].is_array() );
return this->receive_call( args[0].as_uint64(),
args[1].as_string(),
args[2].get_array() );
});
_rpc_state.add_method( "notice", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_notice( args[0].as_uint64(), args[1].get_array() );
return variant();
});
_rpc_state.add_method( "callback", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_callback( args[0].as_uint64(), args[1].get_array() );
return variant();
});
_rpc_state.on_unhandled( [&]( const std::string& method_name, const variants& args ){
return this->receive_call( 0, method_name, args );
});
_connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg,true); } );
_connection.on_http_handler( [&]( const std::string& msg ){ return on_message(msg,false); } );
_connection.closed.connect( [this](){ closed(); } );
}
virtual variant send_call( api_id_type api_id,
string method_name,
variants args = variants() ) override
{
auto request = _rpc_state.start_remote_call( "call", {api_id, std::move(method_name), std::move(args) } );
_connection.send_message( fc::json::to_string(request) );
return _rpc_state.wait_for_response( *request.id );
}
virtual variant send_callback( uint64_t callback_id, variants args = variants() ) override
{
auto request = _rpc_state.start_remote_call( "callback", {callback_id, std::move(args) } );
_connection.send_message( fc::json::to_string(request) );
return _rpc_state.wait_for_response( *request.id );
}
virtual void send_notice( uint64_t callback_id, variants args = variants() ) override
{
fc::rpc::request req{ optional<uint64_t>(), "notice", {callback_id, std::move(args)}};
_connection.send_message( fc::json::to_string(req) );
}
websocket_api_connection( fc::http::websocket_connection& c );
~websocket_api_connection();
virtual variant send_call(
api_id_type api_id,
string method_name,
variants args = variants() ) override;
virtual variant send_callback(
uint64_t callback_id,
variants args = variants() ) override;
virtual void send_notice(
uint64_t callback_id,
variants args = variants() ) override;
protected:
std::string on_message( const std::string& message, bool send_message = true )
{
try {
auto var = fc::json::from_string(message);
const auto& var_obj = var.get_object();
if( var_obj.contains( "method" ) )
{
auto call = var.as<fc::rpc::request>();
try {
auto result = _rpc_state.local_call( call.method, call.params );
if( call.id )
{
auto reply = fc::json::to_string( response( *call.id, result ) );
if( send_message )
_connection.send_message( reply );
return reply;
}
}
catch ( const fc::exception& e )
{
if( call.id )
{
auto reply = fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) );
if( send_message )
_connection.send_message( reply );
std::string on_message(
const std::string& message,
bool send_message = true );
return reply;
}
}
}
else
{
auto reply = var.as<fc::rpc::response>();
_rpc_state.handle_reply( reply );
}
} catch ( const fc::exception& e ) {
wdump((e.to_detail_string()));
return e.to_detail_string();
}
return string();
}
fc::http::websocket_connection& _connection;
fc::rpc::state _rpc_state;
};

View file

@ -30,6 +30,94 @@
namespace fc { namespace rpc {
cli::~cli()
{
if( _run_complete.valid() )
{
stop();
}
}
variant cli::send_call( api_id_type api_id, string method_name, variants args /* = variants() */ )
{
FC_ASSERT(false);
}
variant cli::send_callback( uint64_t callback_id, variants args /* = variants() */ )
{
FC_ASSERT(false);
}
void cli::send_notice( uint64_t callback_id, variants args /* = variants() */ )
{
FC_ASSERT(false);
}
void cli::start()
{
_run_complete = fc::async( [&](){ run(); } );
}
void cli::stop()
{
_run_complete.cancel();
_run_complete.wait();
}
void cli::wait()
{
_run_complete.wait();
}
void cli::format_result( const string& method, std::function<string(variant,const variants&)> formatter)
{
_result_formatters[method] = formatter;
}
void cli::set_prompt( const string& prompt )
{
_prompt = prompt;
}
void cli::run()
{
while( !_run_complete.canceled() )
{
try
{
std::string line;
try
{
getline( _prompt.c_str(), line );
}
catch ( const fc::eof_exception& e )
{
break;
}
std::cout << line << "\n";
line += char(EOF);
fc::variants args = fc::json::variants_from_string(line);;
if( args.size() == 0 )
continue;
const string& method = args[0].get_string();
auto result = receive_call( 0, method, variants( args.begin()+1,args.end() ) );
auto itr = _result_formatters.find( method );
if( itr == _result_formatters.end() )
{
std::cout << fc::json::to_pretty_string( result ) << "\n";
}
else
std::cout << itr->second( result, args ) << "\n";
}
catch ( const fc::exception& e )
{
std::cout << e.to_detail_string() << "\n";
}
}
}
void cli::getline( const fc::string& prompt, fc::string& line)
{
// getting file descriptor for C++ streams is near impossible
@ -69,4 +157,4 @@ void cli::getline( const fc::string& prompt, fc::string& line)
}
}
} }
} } // namespace fc::rpc

123
src/rpc/http_api.cpp Normal file
View file

@ -0,0 +1,123 @@
#include <fc/rpc/http_api.hpp>
namespace fc { namespace rpc {
http_api_connection::~http_api_connection()
{
}
http_api_connection::http_api_connection()
{
_rpc_state.add_method( "call", [this]( const variants& args ) -> variant
{
FC_ASSERT( args.size() == 3 && args[2].is_array() );
return this->receive_call(
args[0].as_uint64(),
args[1].as_string(),
args[2].get_array() );
} );
_rpc_state.add_method( "notice", [this]( const variants& args ) -> variant
{
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_notice(
args[0].as_uint64(),
args[1].get_array() );
return variant();
} );
_rpc_state.add_method( "callback", [this]( const variants& args ) -> variant
{
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_callback(
args[0].as_uint64(),
args[1].get_array() );
return variant();
} );
_rpc_state.on_unhandled( [&]( const std::string& method_name, const variants& args )
{
return this->receive_call( 0, method_name, args );
} );
}
variant http_api_connection::send_call(
api_id_type api_id,
string method_name,
variants args /* = variants() */ )
{
// HTTP has no way to do this, so do nothing
return variant();
}
variant http_api_connection::send_callback(
uint64_t callback_id,
variants args /* = variants() */ )
{
// HTTP has no way to do this, so do nothing
return variant();
}
void http_api_connection::send_notice(
uint64_t callback_id,
variants args /* = variants() */ )
{
// HTTP has no way to do this, so do nothing
return;
}
void http_api_connection::on_request( const fc::http::request& req, const fc::http::server::response& resp )
{
// this must be called by outside HTTP server's on_request method
std::string resp_body;
http::reply::status_code resp_status;
try
{
resp.add_header( "Content-Type", "application/json" );
std::string req_body( req.body.begin(), req.body.end() );
auto var = fc::json::from_string( req_body );
const auto& var_obj = var.get_object();
if( var_obj.contains( "method" ) )
{
auto call = var.as<fc::rpc::request>();
try
{
auto result = _rpc_state.local_call( call.method, call.params );
resp_body = fc::json::to_string( fc::rpc::response( *call.id, result ) );
resp_status = http::reply::OK;
}
catch ( const fc::exception& e )
{
resp_body = fc::json::to_string( fc::rpc::response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) );
resp_status = http::reply::InternalServerError;
}
}
else
{
resp_status = http::reply::BadRequest;
resp_body = "";
}
}
catch ( const fc::exception& e )
{
resp_status = http::reply::InternalServerError;
resp_body = "";
wdump((e.to_detail_string()));
}
try
{
resp.set_status( resp_status );
resp.set_length( resp_body.length() );
resp.write( resp_body.c_str(), resp_body.length() );
}
catch( const fc::exception& e )
{
wdump((e.to_detail_string()));
}
return;
}
} } // namespace fc::rpc

121
src/rpc/websocket_api.cpp Normal file
View file

@ -0,0 +1,121 @@
#include <fc/rpc/websocket_api.hpp>
namespace fc { namespace rpc {
websocket_api_connection::~websocket_api_connection()
{
}
websocket_api_connection::websocket_api_connection( fc::http::websocket_connection& c )
: _connection(c)
{
_rpc_state.add_method( "call", [this]( const variants& args ) -> variant
{
FC_ASSERT( args.size() == 3 && args[2].is_array() );
return this->receive_call(
args[0].as_uint64(),
args[1].as_string(),
args[2].get_array() );
} );
_rpc_state.add_method( "notice", [this]( const variants& args ) -> variant
{
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_notice( args[0].as_uint64(), args[1].get_array() );
return variant();
} );
_rpc_state.add_method( "callback", [this]( const variants& args ) -> variant
{
FC_ASSERT( args.size() == 2 && args[1].is_array() );
this->receive_callback( args[0].as_uint64(), args[1].get_array() );
return variant();
} );
_rpc_state.on_unhandled( [&]( const std::string& method_name, const variants& args )
{
return this->receive_call( 0, method_name, args );
} );
_connection.on_message_handler( [&]( const std::string& msg ){ on_message(msg,true); } );
_connection.on_http_handler( [&]( const std::string& msg ){ return on_message(msg,false); } );
_connection.closed.connect( [this](){ closed(); } );
}
variant websocket_api_connection::send_call(
api_id_type api_id,
string method_name,
variants args /* = variants() */ )
{
auto request = _rpc_state.start_remote_call( "call", {api_id, std::move(method_name), std::move(args) } );
_connection.send_message( fc::json::to_string(request) );
return _rpc_state.wait_for_response( *request.id );
}
variant websocket_api_connection::send_callback(
uint64_t callback_id,
variants args /* = variants() */ )
{
auto request = _rpc_state.start_remote_call( "callback", {callback_id, std::move(args) } );
_connection.send_message( fc::json::to_string(request) );
return _rpc_state.wait_for_response( *request.id );
}
void websocket_api_connection::send_notice(
uint64_t callback_id,
variants args /* = variants() */ )
{
fc::rpc::request req{ optional<uint64_t>(), "notice", {callback_id, std::move(args)}};
_connection.send_message( fc::json::to_string(req) );
}
std::string websocket_api_connection::on_message(
const std::string& message,
bool send_message /* = true */ )
{
try
{
auto var = fc::json::from_string(message);
const auto& var_obj = var.get_object();
if( var_obj.contains( "method" ) )
{
auto call = var.as<fc::rpc::request>();
try
{
auto result = _rpc_state.local_call( call.method, call.params );
if( call.id )
{
auto reply = fc::json::to_string( response( *call.id, result ) );
if( send_message )
_connection.send_message( reply );
return reply;
}
}
catch ( const fc::exception& e )
{
if( call.id )
{
auto reply = fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) );
if( send_message )
_connection.send_message( reply );
return reply;
}
}
}
else
{
auto reply = var.as<fc::rpc::response>();
_rpc_state.handle_reply( reply );
}
}
catch ( const fc::exception& e )
{
wdump((e.to_detail_string()));
return e.to_detail_string();
}
return string();
}
} } // namespace fc::rpc