///
// Copyright 2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//
#include
#include
#include
#include
#include
#define CHAIN_BLOCKING_XFER(func, exp, status) \
if (status) { \
status = (static_cast((func)) == exp); \
} else { \
UHD_LOG << "rpc_client operation skipped: " #func "\n"; \
} \
namespace uhd { namespace usrprio_rpc {
using boost::asio::ip::tcp;
rpc_client::rpc_client (
const std::string& server,
const std::string& port,
boost::uint32_t process_id,
boost::uint32_t host_id
) : _socket(_io_service)
{
//Fill in handshake info
_hshake_args_client.version = CURRENT_VERSION;
_hshake_args_client.oldest_comp_version = OLDEST_COMPATIBLE_VERSION;
_hshake_args_client.client_id = build_client_id(host_id, process_id);
_hshake_args_client.boost_archive_version = boost_serialization_archive_utils::get_version();
try {
//Synchronous resolve + connect
tcp::resolver resolver(_io_service);
//Create flags object with all special flags disabled. Especially the following:
//- address_configured: Only return addresses if a non-loopback address is configured for the system.
//- numeric_host: No name resolution should be attempted for host
//- numeric_service: No name resolution should be attempted for service
tcp::resolver::query::flags query_flags(tcp::resolver::query::passive);
tcp::resolver::query query(tcp::v4(), server, port, query_flags);
tcp::resolver::iterator iterator = resolver.resolve(query);
#if BOOST_VERSION < 104700
// default constructor creates end iterator
tcp::resolver::iterator end;
boost::system::error_code error = boost::asio::error::host_not_found;
while (error && iterator != end)
{
_socket.close();
_socket.connect(*iterator++, error);
}
if (error)
throw boost::system::system_error(error);
#else
boost::asio::connect(_socket, iterator);
#endif
UHD_LOG << "rpc_client connected to server." << std::endl;
try {
//Perform handshake
bool status = true;
CHAIN_BLOCKING_XFER(
boost::asio::write(_socket, boost::asio::buffer(&_hshake_args_client, sizeof(_hshake_args_client))),
sizeof(_hshake_args_client), status);
CHAIN_BLOCKING_XFER(
boost::asio::read(_socket, boost::asio::buffer(&_hshake_args_server, sizeof(_hshake_args_server))),
sizeof(_hshake_args_server), status);
_request.header.client_id = _hshake_args_server.client_id;
if (_hshake_args_server.version >= _hshake_args_client.oldest_comp_version &&
_hshake_args_client.version >= _hshake_args_server.oldest_comp_version &&
status)
{
UHD_LOG << "rpc_client bound to server." << std::endl;
_wait_for_next_response_header();
//Spawn a thread for the io_service callback handler. This thread will run until rpc_client is destroyed.
_io_service_thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service)));
} else {
UHD_LOG << "rpc_client handshake failed." << std::endl;
_exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
}
UHD_LOG << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.") %
_hshake_args_client.boost_archive_version %
_hshake_args_server.boost_archive_version;
} catch (boost::exception&) {
UHD_LOG << "rpc_client handshake aborted." << std::endl;
_exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
}
} catch (boost::exception&) {
UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl;
_exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
#if BOOST_VERSION < 104700
} catch (std::exception& e) {
UHD_LOG << "rpc_client connection error: " << e.what() << std::endl;
_exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
#endif
}
}
rpc_client::~rpc_client () {
_stop_io_service();
}
const boost::system::error_code& rpc_client::call(
func_id_t func_id,
const func_args_writer_t& in_args,
func_args_reader_t &out_args,
boost::posix_time::milliseconds timeout
)
{
boost::mutex::scoped_lock lock(_mutex);
if (_io_service_thread.get()) {
_request.header.func_id = func_id;
in_args.store(_request.data);
_request.header.func_args_size = _request.data.size();
_exec_err.clear();
//Send function call header and args
bool status = true;
try {
CHAIN_BLOCKING_XFER(
boost::asio::write(_socket, boost::asio::buffer(&_request.header, sizeof(_request.header))),
sizeof(_request.header), status);
if (not _request.data.empty())
{
CHAIN_BLOCKING_XFER(
boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())),
_request.data.size(), status);
}
} catch (boost::exception&) {
status = false;
}
//Wait for response using condition variable
if (status) {
if (!_exec_gate.timed_wait(lock, timeout)) {
UHD_LOG << "rpc_client function timed out." << std::endl;
_exec_err.assign(boost::asio::error::timed_out, boost::asio::error::get_system_category());
}
} else {
UHD_LOG << "rpc_client connection dropped." << std::endl;
_exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
_stop_io_service();
}
//Verify that we are talking to the correct endpoint
if ((_request.header.client_id != _response.header.client_id) && !_exec_err) {
UHD_LOG << "rpc_client confused about who its talking to." << std::endl;
_exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
}
if (!_exec_err) out_args.load(_response.data);
}
return _exec_err;
}
void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size_t transferred, size_t expected)
{
boost::mutex::scoped_lock lock(_mutex);
_exec_err = err;
if (!_exec_err && (transferred == expected)) {
//Response header received. Verify that it is expected
if (func_args_header_t::match_function(_request.header, _response.header)) {
if (_response.header.func_args_size)
{
_response.data.resize(_response.header.func_args_size);
//Wait for response data
boost::asio::async_read(_socket,
boost::asio::buffer(&(*_response.data.begin()), _response.data.size()),
boost::bind(&rpc_client::_handle_response_data, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
_response.data.size()));
} else {
_handle_response_data(err, 0, 0);
}
} else {
//Unexpected response. Ignore it.
UHD_LOG << "rpc_client received garbage responses." << std::endl;
_exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
_wait_for_next_response_header();
}
}
if (_exec_err) _exec_gate.notify_all();
}
void rpc_client::_handle_response_data(const boost::system::error_code& err, size_t transferred, size_t expected)
{
boost::mutex::scoped_lock lock(_mutex);
_exec_err = err;
if (transferred != expected) {
_exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
}
_exec_gate.notify_all();
_wait_for_next_response_header();
}
void rpc_client::_wait_for_next_response_header() {
//_mutex must be locked when this call is made
boost::asio::async_read(
_socket,
boost::asio::buffer(&_response.header, sizeof(_response.header)),
boost::bind(&rpc_client::_handle_response_hdr, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
sizeof(_response.header)));
}
}}