diff --git a/include/fc/rpc/bstate.hpp b/include/fc/rpc/bstate.hpp new file mode 100644 index 0000000..b7c74b7 --- /dev/null +++ b/include/fc/rpc/bstate.hpp @@ -0,0 +1,58 @@ +#pragma once +#include +#include +#include +#include + +namespace fc { namespace rpc { + typedef std::vector params_type; + typedef std::vector result_type; + + struct brequest + { + optional id; + std::string method; + params_type params; + }; + + struct bresponse + { + bresponse(){} + bresponse( int64_t i, result_type r ):id(i),result(r){} + bresponse( int64_t i, error_object r ):id(i),error(r){} + int64_t id = 0; + optional result; + optional error; + }; + + /** binary RPC state */ + class bstate + { + public: + typedef std::function method; + ~bstate(); + + void add_method( const fc::string& name, method m ); + void remove_method( const fc::string& name ); + + result_type local_call( const string& method_name, const params_type& args ); + void handle_reply( const bresponse& response ); + + brequest start_remote_call( const string& method_name, params_type args ); + result_type wait_for_response( uint64_t request_id ); + + void close(); + + void on_unhandled( const std::function& unhandled ); + + private: + uint64_t _next_id = 1; + std::unordered_map::ptr> _awaiting; + std::unordered_map _methods; + std::function _unhandled; + }; +} } // namespace fc::rpc + +FC_REFLECT( fc::rpc::brequest, (id)(method)(params) ); +FC_REFLECT( fc::rpc::bresponse, (id)(result)(error) ) + diff --git a/src/rpc/bstate.cpp b/src/rpc/bstate.cpp new file mode 100644 index 0000000..832f8ce --- /dev/null +++ b/src/rpc/bstate.cpp @@ -0,0 +1,68 @@ +#include +#include +#include + +namespace fc { namespace rpc { +bstate::~bstate() +{ + close(); +} + +void bstate::add_method( const fc::string& name, method m ) +{ + _methods.emplace(std::pair(name,fc::move(m))); +} + +void bstate::remove_method( const fc::string& name ) +{ + _methods.erase(name); +} + +result_type bstate::local_call( const string& method_name, const params_type& args ) +{ + auto method_itr = _methods.find(method_name); + if( method_itr == _methods.end() && _unhandled ) + return _unhandled( method_name, args ); + FC_ASSERT( method_itr != _methods.end(), "Unknown Method: ${name}", ("name",method_name) ); + return method_itr->second(args); +} + +void bstate::handle_reply( const bresponse& bresponse ) +{ + auto await = _awaiting.find( bresponse.id ); + FC_ASSERT( await != _awaiting.end(), "Unknown Response ID: ${id}", ("id",bresponse.id)("bresponse",bresponse) ); + if( bresponse.result ) + await->second->set_value( *bresponse.result ); + else if( bresponse.error ) + { + await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",bresponse.error->message)("data",bresponse) ) ) ); + } + else + await->second->set_value( params_type() ); + _awaiting.erase(await); +} + +brequest bstate::start_remote_call( const string& method_name, params_type args ) +{ + brequest brequest{ _next_id++, method_name, std::move(args) }; + _awaiting[*brequest.id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); + return brequest; +} +result_type bstate::wait_for_response( uint64_t request_id ) +{ + auto itr = _awaiting.find(request_id); + FC_ASSERT( itr != _awaiting.end() ); + return fc::future( itr->second ).wait(); +} +void bstate::close() +{ + for( auto item : _awaiting ) + item.second->set_exception( fc::exception_ptr(new FC_EXCEPTION( eof_exception, "connection closed" )) ); + _awaiting.clear(); +} +void bstate::on_unhandled( const std::function& unhandled ) +{ + _unhandled = unhandled; +} + +} } // namespace fc::rpc