From b8d7d3012d78898d1671f59ee4e6206cfb8a9e5f Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Thu, 26 Mar 2015 18:17:47 -0400 Subject: [PATCH] abstracting rpc state --- CMakeLists.txt | 1 + include/fc/rpc/state.hpp | 54 +++++++++++++++++++++++++++++++++++ src/rpc/state.cpp | 61 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 include/fc/rpc/state.hpp create mode 100644 src/rpc/state.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6bce98e..90cca3f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,6 +91,7 @@ set( CMAKE_FIND_LIBRARY_SUFFIXES ${ORIGINAL_LIB_SUFFIXES} ) option( UNITY_BUILD OFF ) set( fc_sources + src/rpc/state.cpp src/uint128.cpp src/real128.cpp src/variant.cpp diff --git a/include/fc/rpc/state.hpp b/include/fc/rpc/state.hpp new file mode 100644 index 0000000..e42439c --- /dev/null +++ b/include/fc/rpc/state.hpp @@ -0,0 +1,54 @@ +#pragma once +#include +#include +#include + +namespace fc { namespace rpc { + struct request + { + optional id; + std::string method; + variants params; + }; + + struct error_object + { + int64_t code = 0; + std::string message; + optional data; + }; + + struct response + { + int64_t id = 0; + optional result; + optional error; + }; + + class state + { + public: + typedef std::function method; + ~state(); + + void add_method( const fc::string& name, method m ); + void remove_method( const fc::string& name ); + + variant local_call( const string& method_name, const variants& args ); + void handle_reply( const response& response ); + + request start_remote_call( const string& method_name, variants args ); + variant wait_for_response( uint64_t request_id ); + + void close(); + + private: + uint64_t _next_id = 1; + std::unordered_map::ptr> _awaiting; + std::unordered_map _methods; + }; +} } // namespace fc::rpc + +FC_REFLECT( fc::rpc::request, (id)(method)(params) ); +FC_REFLECT( fc::rpc::error_object, (code)(message)(data) ) +FC_REFLECT( fc::rpc::response, (id)(result)(error) ) diff --git a/src/rpc/state.cpp b/src/rpc/state.cpp new file mode 100644 index 0000000..4a67faa --- /dev/null +++ b/src/rpc/state.cpp @@ -0,0 +1,61 @@ +#include +#include +#include + +namespace fc { namespace rpc { +state::~state() +{ + close(); +} + +void state::add_method( const fc::string& name, method m ) +{ + _methods.emplace(std::pair(name,fc::move(m))); +} + +void state::remove_method( const fc::string& name ) +{ + _methods.erase(name); +} + +variant state::local_call( const string& method_name, const variants& args ) +{ + auto method_itr = _methods.find(method_name); + FC_ASSERT( method_itr != _methods.end(), "Unknown Method: ${name}", ("name",method_name) ); + return method_itr->second(args); +} + +void state::handle_reply( const response& response ) +{ + auto await = _awaiting.find( response.id ); + FC_ASSERT( await != _awaiting.end(), "Unknown Response ID: ${id}", ("id",response.id)("response",response) ); + if( response.result ) + await->second->set_value( *response.result ); + else + { + FC_ASSERT( response.error ); + await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",*response.error) ) ) ); + } + _awaiting.erase(await); +} + +request state::start_remote_call( const string& method_name, variants args ) +{ + request request{ _next_id++, method_name, std::move(args) }; + _awaiting[*request.id] = fc::promise::ptr( new fc::promise("json_connection::async_call") ); + return request; +} +variant state::wait_for_response( uint64_t request_id ) +{ + auto itr = _awaiting.find(request_id); + FC_ASSERT( itr != _awaiting.end() ); + return fc::future( itr->second ).wait(); +} +void state::close() +{ + for( auto item : _awaiting ) + item.second->set_exception( fc::exception_ptr(new FC_EXCEPTION( eof_exception, "connection closed" )) ); + _awaiting.clear(); +} + +} } // namespace fc::rpc