fix bugs with websocket and integrate API support

This commit is contained in:
Daniel Larimer 2015-03-27 16:29:33 -04:00
parent 7008d1419a
commit 74b707999c
6 changed files with 211 additions and 99 deletions

View file

@ -240,8 +240,6 @@ add_executable( api tests/api.cpp )
target_link_libraries( api fc )
include_directories( vendor/websocketpp )
add_executable( websockettest tests/websocket.cpp )
target_link_libraries( websockettest fc )
add_executable( ntp_test ntp_test.cpp )
target_link_libraries( ntp_test fc )

View file

@ -2,6 +2,7 @@
#include <functional>
#include <memory>
#include <string>
#include <fc/any.hpp>
namespace fc { namespace http {
namespace detail {
@ -14,25 +15,20 @@ namespace fc { namespace http {
public:
virtual ~websocket_connection(){};
virtual void send_message( const std::string& message ) = 0;
virtual void close( int64_t code, const std::string& reason ){};
void on_message( const std::string& message ) { _on_message(message); }
void on_message_handler( const std::function<void(const std::string&)>& h ) { _on_message = h; }
void set_session_data( fc::any d ){ _session_data = std::move(d); }
fc::any& get_session_data() { return _session_data; }
private:
fc::any _session_data;
std::function<void(const std::string&)> _on_message;
};
typedef std::shared_ptr<websocket_connection> websocket_connection_ptr;
class websocket_session
{
public:
websocket_session( const websocket_connection_ptr& con )
:_connection(con){}
virtual ~websocket_session(){};
virtual void on_message( const std::string& message ) = 0;
void send_message( const std::string& message ) { _connection->send_message(message); }
private:
websocket_connection_ptr _connection;
};
typedef std::shared_ptr<websocket_session> websocket_session_ptr;
typedef std::function< websocket_session_ptr( const websocket_connection_ptr& ) > session_factory;
typedef std::function<void(const websocket_connection_ptr&)> on_connection_handler;
class websocket_server
{
@ -40,7 +36,7 @@ namespace fc { namespace http {
websocket_server();
~websocket_server();
void on_connection( const session_factory& factory );
void on_connection( const on_connection_handler& handler);
void listen( uint16_t port );
void start_accept();
@ -55,7 +51,7 @@ namespace fc { namespace http {
websocket_client();
~websocket_client();
websocket_session_ptr connect( const std::string& uri, const session_factory& );
websocket_connection_ptr connect( const std::string& uri );
private:
std::unique_ptr<detail::websocket_client_impl> my;
};

View file

@ -111,15 +111,11 @@ namespace fc {
}
/** makes calls to the remote server */
virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() )
{
FC_ASSERT( _remote_connection );
return _remote_connection->receive_call( api_id, method_name, args );
}
virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) = 0;
variant receive_call( api_id_type api_id, const string& method_name, const variants& args = variants() )const
{
wdump( (api_id)(method_name)(args) );
//wdump( (api_id)(method_name)(args) );
FC_ASSERT( _local_apis.size() > api_id );
return _local_apis[api_id]->call( method_name, args );
}
@ -131,19 +127,8 @@ namespace fc {
return _local_apis.size() - 1;
}
void set_remote_connection( const std::shared_ptr<fc::api_connection>& rc )
{
FC_ASSERT( !_remote_connection );
FC_ASSERT( rc != this->shared_from_this() );
_remote_connection = rc;
if( _remote_connection && _remote_connection->remote_connection() != this->shared_from_this() )
_remote_connection->set_remote_connection( this->shared_from_this() );
}
const std::shared_ptr<fc::api_connection>& remote_connection()const { return _remote_connection; }
private:
std::vector< std::unique_ptr<detail::generic_api> > _local_apis;
std::shared_ptr<fc::api_connection> _remote_connection;
struct api_visitor
@ -187,6 +172,27 @@ namespace fc {
};
};
class local_api_connection : public api_connection
{
public:
/** makes calls to the remote server */
virtual variant send_call( api_id_type api_id, const string& method_name, const variants& args = variants() ) override
{
FC_ASSERT( _remote_connection );
return _remote_connection->receive_call( api_id, method_name, args );
}
void set_remote_connection( const std::shared_ptr<fc::api_connection>& rc )
{
FC_ASSERT( !_remote_connection );
FC_ASSERT( rc != this->shared_from_this() );
_remote_connection = rc;
}
const std::shared_ptr<fc::api_connection>& remote_connection()const { return _remote_connection; }
std::shared_ptr<fc::api_connection> _remote_connection;
};
template<typename Api>
detail::generic_api::generic_api( const Api& a, const std::shared_ptr<fc::api_connection>& c )
:_api_connection(c),_api(a)

View file

@ -7,31 +7,32 @@
namespace fc { namespace rpc {
class websocket_api : public api_connection, public fc::rpc::state, public fc::http::websocket_session
class websocket_api_connection : public api_connection
{
public:
websocket_api( fc::http::websocket_connection_ptr c )
:fc::http::websocket_session(c)
websocket_api_connection( fc::http::websocket_connection_ptr c )
:_connection(c)
{
add_method( "call", [this]( const variants& args ) -> variant {
_rpc_state.add_method( "call", [this]( const variants& args ) -> variant {
FC_ASSERT( args.size() == 3 && args[2].is_array() );
return this->receive_call( args[0].as_uint64(),
args[1].as_string(),
args[2].get_array() );
});
_connection->on_message_handler( [&]( const std::string& msg ){ on_message(msg); } );
}
virtual variant send_call( api_id_type api_id,
const string& method_name,
const variants& args = variants() ) override
{
auto request = this->start_remote_call( "call", {api_id, method_name, args} );
send_message( fc::json::to_string(request) );
return wait_for_response( *request.id );
auto request = _rpc_state.start_remote_call( "call", {api_id, method_name, args} );
_connection->send_message( fc::json::to_string(request) );
return _rpc_state.wait_for_response( *request.id );
}
protected:
virtual void on_message( const std::string& message )
void on_message( const std::string& message )
{
auto var = fc::json::from_string(message);
const auto& var_obj = var.get_object();
@ -39,26 +40,28 @@ namespace fc { namespace rpc {
{
auto call = var.as<fc::rpc::request>();
try {
auto result = local_call( call.method, call.params );
auto result = _rpc_state.local_call( call.method, call.params );
if( call.id )
{
send_message( fc::json::to_string( response( *call.id, result ) ) );
_connection->send_message( fc::json::to_string( response( *call.id, result ) ) );
}
}
catch ( const fc::exception& e )
{
if( call.id )
{
send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) );
_connection->send_message( fc::json::to_string( response( *call.id, error_object{ 1, e.to_detail_string(), fc::variant(e)} ) ) );
}
}
}
else
{
auto reply = var.as<fc::rpc::response>();
handle_reply( reply );
_rpc_state.handle_reply( reply );
}
}
fc::http::websocket_connection_ptr _connection;
fc::rpc::state _rpc_state;
};
} } // namespace fc::rpc

View file

@ -4,6 +4,7 @@
#include <websocketpp/server.hpp>
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp>
#include <websocketpp/logger/stub.hpp>
#include <fc/optional.hpp>
#include <fc/variant.hpp>
@ -13,8 +14,55 @@
namespace fc { namespace http {
namespace detail {
struct asio_with_stub_log : public websocketpp::config::asio {
typedef asio_with_stub_log type;
typedef asio base;
typedef base::concurrency_type concurrency_type;
typedef base::request_type request_type;
typedef base::response_type response_type;
typedef base::message_type message_type;
typedef base::con_msg_manager_type con_msg_manager_type;
typedef base::endpoint_msg_manager_type endpoint_msg_manager_type;
/// Custom Logging policies
/*typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::elevel> elog_type;
typedef websocketpp::log::syslog<concurrency_type,
websocketpp::log::alevel> alog_type;
*/
//typedef base::alog_type alog_type;
//typedef base::elog_type elog_type;
typedef websocketpp::log::stub elog_type;
typedef websocketpp::log::stub alog_type;
typedef base::rng_type rng_type;
struct transport_config : public base::transport_config {
typedef type::concurrency_type concurrency_type;
typedef type::alog_type alog_type;
typedef type::elog_type elog_type;
typedef type::request_type request_type;
typedef type::response_type response_type;
typedef websocketpp::transport::asio::basic_socket::endpoint
socket_type;
};
typedef websocketpp::transport::asio::endpoint<transport_config>
transport_type;
static const long timeout_open_handshake = 0;
};
using websocketpp::connection_hdl;
typedef websocketpp::server<websocketpp::config::asio> websocket_server_type;
typedef websocketpp::server<asio_with_stub_log> websocket_server_type;
template<typename T>
class websocket_connection_impl : public websocket_connection
@ -23,35 +71,38 @@ namespace fc { namespace http {
websocket_connection_impl( T con )
:_ws_connection(con){}
virtual void send_message( const std::string& message )
virtual void send_message( const std::string& message )override
{
_ws_connection->send( message );
}
virtual void close( int64_t code, const std::string& reason )override
{
_ws_connection->close(code,reason);
}
T _ws_connection;
};
class websocket_server_impl
{
public:
websocket_server_impl()
:_server_thread( fc::thread::current() )
{
_server.clear_access_channels( websocketpp::log::alevel::all );
_server.init_asio(&fc::asio::default_io_service());
_server.set_reuse_addr(true);
_server.set_open_handler( [&]( connection_hdl hdl ){
_server_thread.async( [&](){
wlog( "on open server" );
auto new_con = std::make_shared<websocket_connection_impl<websocket_server_type::connection_ptr>>( _server.get_con_from_hdl(hdl) );
_connections[hdl] = _factory( new_con );
_on_connection( _connections[hdl] = new_con );
}).wait();
});
_server.set_message_handler( [&]( connection_hdl hdl, websocket_server_type::message_ptr msg ){
_server_thread.async( [&](){
auto current_con = _connections.find(hdl);
assert( current_con != _connections.end() );
wdump(("server")(msg->get_payload()));
//wdump(("server")(msg->get_payload()));
current_con->second->on_message( msg->get_payload() );
}).wait();
});
@ -62,21 +113,33 @@ namespace fc { namespace http {
});
_server.set_fail_handler( [&]( connection_hdl hdl ){
if( _server.is_listening() )
{
_server_thread.async( [&](){
_connections.erase( hdl );
}).wait();
}
});
}
~websocket_server_impl()
{
if( _server.is_listening() )
_server.stop_listening();
auto cpy_con = _connections;
for( auto item : cpy_con )
_server.close( item.first, 0, "server exit" );
}
typedef std::map<connection_hdl, websocket_session_ptr,std::owner_less<connection_hdl> > con_map;
typedef std::map<connection_hdl, websocket_connection_ptr,std::owner_less<connection_hdl> > con_map;
con_map _connections;
fc::thread& _server_thread;
websocket_server_type _server;
session_factory _factory;
on_connection_handler _on_connection;
fc::promise<void>::ptr _closed;
};
typedef websocketpp::client<websocketpp::config::asio_client> websocket_client_type;
typedef websocketpp::client<asio_with_stub_log> websocket_client_type;
typedef websocket_client_type::connection_ptr websocket_client_connection_type;
class websocket_client_impl
@ -87,33 +150,40 @@ namespace fc { namespace http {
websocket_client_impl()
:_client_thread( fc::thread::current() )
{
_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
elog( "default open client" );
});
_client.clear_access_channels( websocketpp::log::alevel::all );
_client.set_message_handler( [&]( connection_hdl hdl, message_ptr msg ){
wlog("start wait");
_client_thread.async( [&](){
wdump((msg->get_payload()));
_session->on_message( msg->get_payload() );
// wdump((msg->get_payload()));
_connection->on_message( msg->get_payload() );
}).wait();
wlog("done wait");
});
_client.set_close_handler( [=]( connection_hdl hdl ){
wlog("start wait");
_client_thread.async( [&](){ _session.reset(); } ).wait();
wlog("done wait");
_client_thread.async( [&](){ _connection.reset(); } ).wait();
if( _closed ) _closed->set_value();
});
_client.set_fail_handler( [=]( connection_hdl hdl ){
wlog("start wait");
_client_thread.async( [&](){ _session.reset(); } ).wait();
wlog("done wait");
auto con = _client.get_con_from_hdl(hdl);
auto message = con->get_ec().message();
_client_thread.async( [&](){ _connection.reset(); } ).wait();
if( _connected && !_connected->ready() )
_connected->set_exception( exception_ptr( new FC_EXCEPTION( exception, "${message}", ("message",message)) ) );
if( _closed )
_closed->set_value();
});
_client.init_asio( &fc::asio::default_io_service() );
}
~websocket_client_impl()
{
if(_connection )
{
_connection->close(0, "client closed");
_closed->wait();
}
}
fc::promise<void>::ptr _connected;
fc::promise<void>::ptr _closed;
fc::thread& _client_thread;
websocket_client_type _client;
websocket_session_ptr _session;
websocket_connection_ptr _connection;
};
} // namespace detail
@ -121,9 +191,9 @@ namespace fc { namespace http {
websocket_server::websocket_server():my( new detail::websocket_server_impl() ) {}
websocket_server::~websocket_server(){}
void websocket_server::on_connection( const session_factory& factory )
void websocket_server::on_connection( const on_connection_handler& handler )
{
my->_factory = factory;
my->_on_connection = handler;
}
void websocket_server::listen( uint16_t port )
@ -137,9 +207,9 @@ namespace fc { namespace http {
websocket_client::websocket_client():my( new detail::websocket_client_impl() ) {}
websocket_client::~websocket_client(){ }
websocket_session_ptr websocket_client::connect( const std::string& uri, const session_factory& factory )
websocket_connection_ptr websocket_client::connect( const std::string& uri )
{ try {
wlog( "connecting to ${uri}", ("uri",uri));
// wlog( "connecting to ${uri}", ("uri",uri));
websocketpp::lib::error_code ec;
my->_connected = fc::promise<void>::ptr( new fc::promise<void>("websocket::connect") );
@ -147,7 +217,7 @@ namespace fc { namespace http {
my->_client.set_open_handler( [=]( websocketpp::connection_hdl hdl ){
auto con = my->_client.get_con_from_hdl(hdl);
my->_connection = std::make_shared<detail::websocket_connection_impl<detail::websocket_client_connection_type>>( con );
my->_session = factory( my->_connection );
my->_closed = fc::promise<void>::ptr( new fc::promise<void>("websocket::closed") );
my->_connected->set_value();
});
@ -158,7 +228,7 @@ namespace fc { namespace http {
}
my->_client.connect(con);
my->_connected->wait();
return my->_session;
return my->_connection;
} FC_CAPTURE_AND_RETHROW( (uri) ) }
} } // fc::http

View file

@ -1,6 +1,7 @@
#include <fc/api.hpp>
#include <fc/log/logger.hpp>
#include <fc/rpc/api_connection.hpp>
#include <fc/rpc/websocket_api.hpp>
class calculator
{
@ -12,7 +13,7 @@ class calculator
FC_API( calculator, (add)(sub) )
class nested_api
class login_api
{
public:
fc::api<calculator> get_calc()const
@ -22,7 +23,7 @@ class nested_api
}
fc::optional<fc::api<calculator>> calc;
};
FC_API( nested_api, (get_calc) );
FC_API( login_api, (get_calc) );
using namespace fc;
@ -39,9 +40,46 @@ class variant_calculator
double sub( fc::variant a, fc::variant b ) { return a.as_double()-b.as_double(); }
};
using namespace fc::http;
using namespace fc::rpc;
int main( int argc, char** argv )
{
{
fc::api<calculator> calc_api( std::make_shared<some_calculator>() );
fc::http::websocket_server server;
server.on_connection([&]( const websocket_connection_ptr& c ){
auto wsc = std::make_shared<websocket_api_connection>(c);
auto login = std::make_shared<login_api>();
login->calc = calc_api;
wsc->register_api(fc::api<login_api>(login));
c->set_session_data( wsc );
});
server.listen( 8090 );
server.start_accept();
for( uint32_t i = 0; i < 5000; ++i )
{
try {
fc::http::websocket_client client;
auto con = client.connect( "ws://localhost:8090" );
auto apic = std::make_shared<websocket_api_connection>(con);
auto remote_login_api = apic->get_remote_api<login_api>();
auto remote_calc = remote_login_api->get_calc();
wdump((remote_calc->add( 4, 5 )));
} catch ( const fc::exception& e )
{
edump((e.to_detail_string()));
}
}
wlog( "exit scope" );
}
wlog( "returning now..." );
return 0;
some_calculator calc;
variant_calculator vcalc;
@ -78,9 +116,9 @@ int main( int argc, char** argv )
ilog( "------------------ NESTED TEST --------------" );
try {
nested_api napi_impl;
login_api napi_impl;
napi_impl.calc = api_calc;
fc::api<nested_api> napi(&napi_impl);
fc::api<login_api> napi(&napi_impl);
fc::api_server server;
auto api_id = server.register_api( napi );
@ -93,9 +131,9 @@ int main( int argc, char** argv )
fc::api<api_server> serv( &server );
fc::api_client<nested_api> apic( serv );
fc::api_client<login_api> apic( serv );
fc::api<nested_api> remote_api = apic;
fc::api<login_api> remote_api = apic;
auto remote_calc = remote_api->get_calc();
@ -110,18 +148,19 @@ int main( int argc, char** argv )
ilog( "------------------ NESTED TEST --------------" );
try {
nested_api napi_impl;
login_api napi_impl;
napi_impl.calc = api_calc;
fc::api<nested_api> napi(&napi_impl);
fc::api<login_api> napi(&napi_impl);
auto client_side = std::make_shared<api_connection>();
auto server_side = std::make_shared<api_connection>();
auto client_side = std::make_shared<local_api_connection>();
auto server_side = std::make_shared<local_api_connection>();
server_side->set_remote_connection( client_side );
client_side->set_remote_connection( server_side );
server_side->register_api( napi );
fc::api<nested_api> remote_api = client_side->get_remote_api<nested_api>();
fc::api<login_api> remote_api = client_side->get_remote_api<login_api>();
auto remote_calc = remote_api->get_calc();
int r = remote_calc->add( 4, 5 );