abstracting rpc state

This commit is contained in:
Daniel Larimer 2015-03-26 18:17:47 -04:00
parent 41fedc14f3
commit b8d7d3012d
3 changed files with 116 additions and 0 deletions

View file

@ -91,6 +91,7 @@ set( CMAKE_FIND_LIBRARY_SUFFIXES ${ORIGINAL_LIB_SUFFIXES} )
option( UNITY_BUILD OFF ) option( UNITY_BUILD OFF )
set( fc_sources set( fc_sources
src/rpc/state.cpp
src/uint128.cpp src/uint128.cpp
src/real128.cpp src/real128.cpp
src/variant.cpp src/variant.cpp

54
include/fc/rpc/state.hpp Normal file
View file

@ -0,0 +1,54 @@
#pragma once
#include <fc/variant.hpp>
#include <functional>
#include <fc/thread/future.hpp>
namespace fc { namespace rpc {
struct request
{
optional<uint64_t> id;
std::string method;
variants params;
};
struct error_object
{
int64_t code = 0;
std::string message;
optional<variant> data;
};
struct response
{
int64_t id = 0;
optional<fc::variant> result;
optional<error_object> error;
};
class state
{
public:
typedef std::function<variant(const variants&)> method;
~state();
void add_method( const fc::string& name, method m );
void remove_method( const fc::string& name );
variant local_call( const string& method_name, const variants& args );
void handle_reply( const response& response );
request start_remote_call( const string& method_name, variants args );
variant wait_for_response( uint64_t request_id );
void close();
private:
uint64_t _next_id = 1;
std::unordered_map<uint64_t, fc::promise<variant>::ptr> _awaiting;
std::unordered_map<std::string, method> _methods;
};
} } // namespace fc::rpc
FC_REFLECT( fc::rpc::request, (id)(method)(params) );
FC_REFLECT( fc::rpc::error_object, (code)(message)(data) )
FC_REFLECT( fc::rpc::response, (id)(result)(error) )

61
src/rpc/state.cpp Normal file
View file

@ -0,0 +1,61 @@
#include <fc/rpc/state.hpp>
#include <fc/thread/thread.hpp>
#include <fc/reflect/variant.hpp>
namespace fc { namespace rpc {
state::~state()
{
close();
}
void state::add_method( const fc::string& name, method m )
{
_methods.emplace(std::pair<std::string,method>(name,fc::move(m)));
}
void state::remove_method( const fc::string& name )
{
_methods.erase(name);
}
variant state::local_call( const string& method_name, const variants& args )
{
auto method_itr = _methods.find(method_name);
FC_ASSERT( method_itr != _methods.end(), "Unknown Method: ${name}", ("name",method_name) );
return method_itr->second(args);
}
void state::handle_reply( const response& response )
{
auto await = _awaiting.find( response.id );
FC_ASSERT( await != _awaiting.end(), "Unknown Response ID: ${id}", ("id",response.id)("response",response) );
if( response.result )
await->second->set_value( *response.result );
else
{
FC_ASSERT( response.error );
await->second->set_exception( exception_ptr(new FC_EXCEPTION( exception, "${error}", ("error",*response.error) ) ) );
}
_awaiting.erase(await);
}
request state::start_remote_call( const string& method_name, variants args )
{
request request{ _next_id++, method_name, std::move(args) };
_awaiting[*request.id] = fc::promise<variant>::ptr( new fc::promise<variant>("json_connection::async_call") );
return request;
}
variant state::wait_for_response( uint64_t request_id )
{
auto itr = _awaiting.find(request_id);
FC_ASSERT( itr != _awaiting.end() );
return fc::future<variant>( itr->second ).wait();
}
void state::close()
{
for( auto item : _awaiting )
item.second->set_exception( fc::exception_ptr(new FC_EXCEPTION( eof_exception, "connection closed" )) );
_awaiting.clear();
}
} } // namespace fc::rpc