aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/nirio/rpc/rpc_client.cpp
blob: 53d336e958f8d9c20b1c8e152e9dc3042b44cc28 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
///
// Copyright 2013,2016 Ettus Research LLC
// Copyright 2018 Ettus Research, a National Instruments Company
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include <uhd/transport/nirio/rpc/rpc_client.hpp>
#include <boost/bind.hpp>
#include <boost/version.hpp>
#include <boost/format.hpp>
#include <boost/asio/error.hpp>

#define CHAIN_BLOCKING_XFER(func, exp, status) \
    if (status) { \
        status = (static_cast<size_t>((func)) == exp); \
    } else { \
        UHD_LOGGER_DEBUG("NIRIO") << "rpc_client operation skipped: " #func "\n"; \
    } \

namespace uhd { namespace usrprio_rpc {

using boost::asio::ip::tcp;

rpc_client::rpc_client (
    const std::string& server,
    const std::string& port,
    uint32_t process_id,
    uint32_t host_id
) : _socket(_io_service)
{
    //Fill in handshake info
    _hshake_args_client.version = CURRENT_VERSION;
    _hshake_args_client.oldest_comp_version = OLDEST_COMPATIBLE_VERSION;
    _hshake_args_client.client_id = build_client_id(host_id, process_id);
    _hshake_args_client.boost_archive_version = boost_serialization_archive_utils::get_version();

    try {
        //Synchronous resolve + connect
        tcp::resolver resolver(_io_service);
        //Create flags object with all special flags disabled. Especially the following:
        //- address_configured: Only return addresses if a non-loopback address is configured for the system.
        //- numeric_host: No name resolution should be attempted for host
        //- numeric_service: No name resolution should be attempted for service
        tcp::resolver::query::flags query_flags(tcp::resolver::query::passive);
        tcp::resolver::query query(tcp::v4(), server, port, query_flags);
        tcp::resolver::iterator iterator = resolver.resolve(query);
        boost::asio::connect(_socket, iterator);

        UHD_LOGGER_TRACE("NIRIO") << "rpc_client connected to server." ;

        try {
            //Perform handshake
            bool status = true;
            CHAIN_BLOCKING_XFER(
                boost::asio::write(_socket, boost::asio::buffer(&_hshake_args_client, sizeof(_hshake_args_client))),
                sizeof(_hshake_args_client), status);
            CHAIN_BLOCKING_XFER(
                boost::asio::read(_socket, boost::asio::buffer(&_hshake_args_server, sizeof(_hshake_args_server))),
                sizeof(_hshake_args_server), status);

            _request.header.client_id = _hshake_args_server.client_id;

            if (_hshake_args_server.version >= _hshake_args_client.oldest_comp_version &&
                _hshake_args_client.version >= _hshake_args_server.oldest_comp_version &&
                status)
            {
                UHD_LOGGER_TRACE("NIRIO") << "rpc_client bound to server." ;
                _wait_for_next_response_header();

                //Spawn a thread for the io_service callback handler. This thread will run until rpc_client is destroyed.
                _io_service_thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service)));
            } else {
                UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake failed." ;
                _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
            }
            UHD_LOGGER_TRACE("NIRIO") << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.") %
                _hshake_args_client.boost_archive_version %
                _hshake_args_server.boost_archive_version;
        } catch (boost::exception&) {
            UHD_LOGGER_DEBUG("NIRIO") << "rpc_client handshake aborted." ;
            _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
        }
    } catch (boost::exception&) {
        UHD_LOGGER_TRACE("NIRIO") << "rpc_client connection request cancelled/aborted." ;
        _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
    }
}

rpc_client::~rpc_client () {
    _stop_io_service();
}

const boost::system::error_code& rpc_client::call(
    func_id_t func_id,
    const func_args_writer_t& in_args,
    func_args_reader_t &out_args,
    boost::posix_time::milliseconds timeout
)
{
    boost::mutex::scoped_lock lock(_mutex);

    if (_io_service_thread.get()) {
        _request.header.func_id = func_id;
        in_args.store(_request.data);
        _request.header.func_args_size = _request.data.size();

        _exec_err.clear();

        //Send function call header and args
        bool status = true;
        try {
            CHAIN_BLOCKING_XFER(
                boost::asio::write(_socket, boost::asio::buffer(&_request.header, sizeof(_request.header))),
                sizeof(_request.header), status);
            if (not _request.data.empty())
            {
                CHAIN_BLOCKING_XFER(
                    boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())),
                    _request.data.size(), status);
            }
        } catch (boost::exception&) {
            status = false;
        }

        //Wait for response using condition variable
        if (status) {
            if (!_exec_gate.timed_wait(lock, timeout)) {
                UHD_LOGGER_DEBUG("NIRIO") << "rpc_client function timed out." ;
                _exec_err.assign(boost::asio::error::timed_out, boost::asio::error::get_system_category());
            }
        } else {
            UHD_LOGGER_DEBUG("NIRIO") << "rpc_client connection dropped." ;
            _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
            _stop_io_service();
        }

        //Verify that we are talking to the correct endpoint
        if ((_request.header.client_id != _response.header.client_id) && !_exec_err) {
            UHD_LOGGER_DEBUG("NIRIO") << "rpc_client confused about who its talking to." ;
            _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
        }

        if (!_exec_err) out_args.load(_response.data);
    }

    return _exec_err;
}

void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size_t transferred, size_t expected)
{
    boost::mutex::scoped_lock lock(_mutex);
    _exec_err = err;
    if (!_exec_err && (transferred == expected)) {
        //Response header received. Verify that it is expected
        if (func_args_header_t::match_function(_request.header, _response.header)) {
            if (_response.header.func_args_size)
            {
                _response.data.resize(_response.header.func_args_size);

                //Wait for response data
                boost::asio::async_read(_socket,
                    boost::asio::buffer(&(*_response.data.begin()), _response.data.size()),
                    boost::bind(&rpc_client::_handle_response_data, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred,
                        _response.data.size()));
            } else {
                _handle_response_data(err, 0, 0);
            }
        } else {
            //Unexpected response. Ignore it.
            UHD_LOGGER_DEBUG("NIRIO") << "rpc_client received garbage responses." ;
            _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());

            _wait_for_next_response_header();
        }
    }

    if (_exec_err) _exec_gate.notify_all();
}

void rpc_client::_handle_response_data(const boost::system::error_code& err, size_t transferred, size_t expected)
{
    boost::mutex::scoped_lock lock(_mutex);
    _exec_err = err;
    if (transferred != expected) {
        _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
    }

    _exec_gate.notify_all();

    _wait_for_next_response_header();
}

void rpc_client::_wait_for_next_response_header() {
    //_mutex must be locked when this call is made
    boost::asio::async_read(
        _socket,
        boost::asio::buffer(&_response.header, sizeof(_response.header)),
        boost::bind(&rpc_client::_handle_response_hdr, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred,
            sizeof(_response.header)));
}

}}