aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/nirio/rpc/rpc_client.cpp
blob: 3d62b57aeabf4d7286f923ed40b241e0f7f36075 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
///
// Copyright 2013,2016 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
//

#include <uhd/transport/nirio/rpc/rpc_client.hpp>
#include <boost/bind.hpp>
#include <boost/version.hpp>
#include <boost/format.hpp>
#include <boost/asio/error.hpp>

#define CHAIN_BLOCKING_XFER(func, exp, status) \
    if (status) { \
        status = (static_cast<size_t>((func)) == exp); \
    } else { \
        UHD_LOG << "rpc_client operation skipped: " #func "\n"; \
    } \

namespace uhd { namespace usrprio_rpc {

using boost::asio::ip::tcp;

rpc_client::rpc_client (
    const std::string& server,
    const std::string& port,
    boost::uint32_t process_id,
    boost::uint32_t host_id
) : _socket(_io_service)
{
    //Fill in handshake info
    _hshake_args_client.version = CURRENT_VERSION;
    _hshake_args_client.oldest_comp_version = OLDEST_COMPATIBLE_VERSION;
    _hshake_args_client.client_id = build_client_id(host_id, process_id);
    _hshake_args_client.boost_archive_version = boost_serialization_archive_utils::get_version();

    try {
        //Synchronous resolve + connect
        tcp::resolver resolver(_io_service);
        //Create flags object with all special flags disabled. Especially the following:
        //- address_configured: Only return addresses if a non-loopback address is configured for the system.
        //- numeric_host: No name resolution should be attempted for host
        //- numeric_service: No name resolution should be attempted for service
        tcp::resolver::query::flags query_flags(tcp::resolver::query::passive);
        tcp::resolver::query query(tcp::v4(), server, port, query_flags);
        tcp::resolver::iterator iterator = resolver.resolve(query);
        boost::asio::connect(_socket, iterator);

        UHD_LOG << "rpc_client connected to server." << std::endl;

        try {
            //Perform handshake
            bool status = true;
            CHAIN_BLOCKING_XFER(
                boost::asio::write(_socket, boost::asio::buffer(&_hshake_args_client, sizeof(_hshake_args_client))),
                sizeof(_hshake_args_client), status);
            CHAIN_BLOCKING_XFER(
                boost::asio::read(_socket, boost::asio::buffer(&_hshake_args_server, sizeof(_hshake_args_server))),
                sizeof(_hshake_args_server), status);

            _request.header.client_id = _hshake_args_server.client_id;

            if (_hshake_args_server.version >= _hshake_args_client.oldest_comp_version &&
                _hshake_args_client.version >= _hshake_args_server.oldest_comp_version &&
                status)
            {
                UHD_LOG << "rpc_client bound to server." << std::endl;
                _wait_for_next_response_header();

                //Spawn a thread for the io_service callback handler. This thread will run until rpc_client is destroyed.
                _io_service_thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service)));
            } else {
                UHD_LOG << "rpc_client handshake failed." << std::endl;
                _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
            }
            UHD_LOG << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.") %
                _hshake_args_client.boost_archive_version %
                _hshake_args_server.boost_archive_version;
        } catch (boost::exception&) {
            UHD_LOG << "rpc_client handshake aborted." << std::endl;
            _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
        }
    } catch (boost::exception&) {
        UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl;
        _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
    }
}

rpc_client::~rpc_client () {
    _stop_io_service();
}

const boost::system::error_code& rpc_client::call(
    func_id_t func_id,
    const func_args_writer_t& in_args,
    func_args_reader_t &out_args,
    boost::posix_time::milliseconds timeout
)
{
    boost::mutex::scoped_lock lock(_mutex);

    if (_io_service_thread.get()) {
        _request.header.func_id = func_id;
        in_args.store(_request.data);
        _request.header.func_args_size = _request.data.size();

        _exec_err.clear();

        //Send function call header and args
        bool status = true;
        try {
            CHAIN_BLOCKING_XFER(
                boost::asio::write(_socket, boost::asio::buffer(&_request.header, sizeof(_request.header))),
                sizeof(_request.header), status);
            if (not _request.data.empty())
            {
                CHAIN_BLOCKING_XFER(
                    boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())),
                    _request.data.size(), status);
            }
        } catch (boost::exception&) {
            status = false;
        }

        //Wait for response using condition variable
        if (status) {
            if (!_exec_gate.timed_wait(lock, timeout)) {
                UHD_LOG << "rpc_client function timed out." << std::endl;
                _exec_err.assign(boost::asio::error::timed_out, boost::asio::error::get_system_category());
            }
        } else {
            UHD_LOG << "rpc_client connection dropped." << std::endl;
            _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
            _stop_io_service();
        }

        //Verify that we are talking to the correct endpoint
        if ((_request.header.client_id != _response.header.client_id) && !_exec_err) {
            UHD_LOG << "rpc_client confused about who its talking to." << std::endl;
            _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
        }

        if (!_exec_err) out_args.load(_response.data);
    }

    return _exec_err;
}

void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size_t transferred, size_t expected)
{
    boost::mutex::scoped_lock lock(_mutex);
    _exec_err = err;
    if (!_exec_err && (transferred == expected)) {
        //Response header received. Verify that it is expected
        if (func_args_header_t::match_function(_request.header, _response.header)) {
            if (_response.header.func_args_size)
            {
                _response.data.resize(_response.header.func_args_size);

                //Wait for response data
                boost::asio::async_read(_socket,
                    boost::asio::buffer(&(*_response.data.begin()), _response.data.size()),
                    boost::bind(&rpc_client::_handle_response_data, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred,
                        _response.data.size()));
            } else {
                _handle_response_data(err, 0, 0);
            }
        } else {
            //Unexpected response. Ignore it.
            UHD_LOG << "rpc_client received garbage responses." << std::endl;
            _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());

            _wait_for_next_response_header();
        }
    }

    if (_exec_err) _exec_gate.notify_all();
}

void rpc_client::_handle_response_data(const boost::system::error_code& err, size_t transferred, size_t expected)
{
    boost::mutex::scoped_lock lock(_mutex);
    _exec_err = err;
    if (transferred != expected) {
        _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
    }

    _exec_gate.notify_all();

    _wait_for_next_response_header();
}

void rpc_client::_wait_for_next_response_header() {
    //_mutex must be locked when this call is made
    boost::asio::async_read(
        _socket,
        boost::asio::buffer(&_response.header, sizeof(_response.header)),
        boost::bind(&rpc_client::_handle_response_hdr, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred,
            sizeof(_response.header)));
}

}}