aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/nirio/rpc/rpc_client.cpp68
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp63
-rw-r--r--host/lib/transport/udp_wsa_zero_copy.cpp81
3 files changed, 164 insertions, 48 deletions
diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp
index a5f8cf412..f8dc26b50 100644
--- a/host/lib/transport/nirio/rpc/rpc_client.cpp
+++ b/host/lib/transport/nirio/rpc/rpc_client.cpp
@@ -17,7 +17,9 @@
#include <uhd/transport/nirio/rpc/rpc_client.hpp>
#include <boost/bind.hpp>
+#include <boost/version.hpp>
#include <boost/format.hpp>
+#include <boost/asio/error.hpp>
#define CHAIN_BLOCKING_XFER(func, exp, status) \
if (status) { \
@@ -48,7 +50,23 @@ rpc_client::rpc_client (
tcp::resolver resolver(_io_service);
tcp::resolver::query query(tcp::v4(), server, port);
tcp::resolver::iterator iterator = resolver.resolve(query);
- boost::asio::connect(_socket, iterator);
+
+ #if BOOST_VERSION < 104700
+ // default constructor creates end iterator
+ tcp::resolver::iterator end;
+
+ boost::system::error_code error = boost::asio::error::host_not_found;
+ while (error && iterator != end)
+ {
+ _socket.close();
+ _socket.connect(*iterator++, error);
+ }
+ if (error)
+ throw boost::system::system_error(error);
+ #else
+ boost::asio::connect(_socket, iterator);
+ #endif
+
UHD_LOG << "rpc_client connected to server." << std::endl;
try {
@@ -74,18 +92,18 @@ rpc_client::rpc_client (
_io_service_thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service)));
} else {
UHD_LOG << "rpc_client handshake failed." << std::endl;
- _exec_err.assign(boost::asio::error::connection_refused, boost::system::system_category());
+ _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
}
UHD_LOG << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.") %
_hshake_args_client.boost_archive_version %
_hshake_args_server.boost_archive_version;
} catch (boost::exception&) {
UHD_LOG << "rpc_client handshake aborted." << std::endl;
- _exec_err.assign(boost::asio::error::connection_refused, boost::system::system_category());
+ _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category());
}
} catch (boost::exception&) {
UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl;
- _exec_err.assign(boost::asio::error::connection_aborted, boost::system::system_category());
+ _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
}
}
@@ -115,9 +133,12 @@ const boost::system::error_code& rpc_client::call(
CHAIN_BLOCKING_XFER(
boost::asio::write(_socket, boost::asio::buffer(&_request.header, sizeof(_request.header))),
sizeof(_request.header), status);
- CHAIN_BLOCKING_XFER(
- boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())),
- _request.data.size(), status);
+ if (not _request.data.empty())
+ {
+ CHAIN_BLOCKING_XFER(
+ boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())),
+ _request.data.size(), status);
+ }
} catch (boost::exception&) {
status = false;
}
@@ -126,18 +147,18 @@ const boost::system::error_code& rpc_client::call(
if (status) {
if (!_exec_gate.timed_wait(lock, timeout)) {
UHD_LOG << "rpc_client function timed out." << std::endl;
- _exec_err.assign(boost::asio::error::timed_out, boost::system::system_category());
+ _exec_err.assign(boost::asio::error::timed_out, boost::asio::error::get_system_category());
}
} else {
UHD_LOG << "rpc_client connection dropped." << std::endl;
- _exec_err.assign(boost::asio::error::connection_aborted, boost::system::system_category());
+ _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
_stop_io_service();
}
//Verify that we are talking to the correct endpoint
if ((_request.header.client_id != _response.header.client_id) && !_exec_err) {
UHD_LOG << "rpc_client confused about who its talking to." << std::endl;
- _exec_err.assign(boost::asio::error::operation_aborted, boost::system::system_category());
+ _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
}
if (!_exec_err) out_args.load(_response.data);
@@ -153,19 +174,24 @@ void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size
if (!_exec_err && (transferred == expected)) {
//Response header received. Verify that it is expected
if (func_args_header_t::match_function(_request.header, _response.header)) {
- _response.data.resize(_response.header.func_args_size);
-
- //Wait for response data
- boost::asio::async_read(_socket,
- boost::asio::buffer(&(*_response.data.begin()), _response.data.size()),
- boost::bind(&rpc_client::_handle_response_data, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred,
- _response.data.size()));
+ if (_response.header.func_args_size)
+ {
+ _response.data.resize(_response.header.func_args_size);
+
+ //Wait for response data
+ boost::asio::async_read(_socket,
+ boost::asio::buffer(&(*_response.data.begin()), _response.data.size()),
+ boost::bind(&rpc_client::_handle_response_data, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred,
+ _response.data.size()));
+ } else {
+ _handle_response_data(err, 0, 0);
+ }
} else {
//Unexpected response. Ignore it.
UHD_LOG << "rpc_client received garbage responses." << std::endl;
- _exec_err.assign(boost::asio::error::operation_aborted, boost::system::system_category());
+ _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
_wait_for_next_response_header();
}
@@ -179,7 +205,7 @@ void rpc_client::_handle_response_data(const boost::system::error_code& err, siz
boost::mutex::scoped_lock lock(_mutex);
_exec_err = err;
if (transferred != expected) {
- _exec_err.assign(boost::asio::error::operation_aborted, boost::system::system_category());
+ _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category());
}
_exec_gate.notify_all();
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 5fdf2594d..5c84327a4 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -154,28 +154,12 @@ public:
/*!
* Flush all transports in the streamer:
- * This calls into get_and_process_single_packet(),
- * so the sequence and flow control are handled.
- * However, the packet payload is discarded.
+ * The packet payload is discarded.
*/
void flush_all(const double timeout = 0.0)
{
- increment_buffer_info(); //increment to next buffer
-
- for (size_t i = 0; i < _props.size(); i++)
- {
- while (true) //while (_props.at(i).get_buff(timeout));
- {
- //receive a single packet from the transport
- try
- {
- if (get_and_process_single_packet(i,
- get_prev_buffer_info(),
- get_curr_buffer_info(),
- timeout) == PACKET_TIMEOUT_ERROR) break;
- }catch(...){}
- }
- }
+ _flush_all(timeout);
+ return;
}
/*!
@@ -379,12 +363,12 @@ private:
******************************************************************/
UHD_INLINE packet_type get_and_process_single_packet(
const size_t index,
- buffers_info_type &prev_buffer_info,
- buffers_info_type &curr_buffer_info,
+ per_buffer_info_type &prev_buffer_info,
+ per_buffer_info_type &curr_buffer_info,
double timeout
){
//get a single packet from the transport layer
- managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff;
+ managed_recv_buffer::sptr &buff = curr_buffer_info.buff;
buff = _props[index].get_buff(timeout);
if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;
@@ -405,7 +389,7 @@ private:
}
//extract packet info
- per_buffer_info_type &info = curr_buffer_info[index];
+ per_buffer_info_type &info = curr_buffer_info;
info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32;
_vrt_unpacker(info.vrt_hdr, info.ifpi);
@@ -442,7 +426,7 @@ private:
#endif
//3) check for out of order timestamps
- if (info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){
+ if (info.ifpi.has_tsf and prev_buffer_info.time > info.time){
return PACKET_TIMESTAMP_ERROR;
}
@@ -450,6 +434,33 @@ private:
return PACKET_IF_DATA;
}
+ void _flush_all(double timeout)
+ {
+ for (size_t i = 0; i < _props.size(); i++)
+ {
+ per_buffer_info_type prev_buffer_info, curr_buffer_info;
+ while (true)
+ {
+ //receive a single packet from the transport
+ try
+ {
+ // call into get_and_process_single_packet()
+ // to make sure flow control is handled
+ if (get_and_process_single_packet(
+ i,
+ prev_buffer_info,
+ curr_buffer_info,
+ timeout) == PACKET_TIMEOUT_ERROR) break;
+ } catch(...){}
+ prev_buffer_info = curr_buffer_info;
+ curr_buffer_info.reset();
+ }
+ }
+ get_prev_buffer_info().reset();
+ get_curr_buffer_info().reset();
+ get_next_buffer_info().reset();
+ }
+
/*******************************************************************
* Alignment check:
* Check the received packet for alignment and mark accordingly.
@@ -509,7 +520,7 @@ private:
//receive a single packet from the transport
try{
packet = get_and_process_single_packet(
- index, prev_info, curr_info, timeout
+ index, prev_info[index], curr_info[index], timeout
);
}
@@ -545,7 +556,9 @@ private:
curr_info.metadata.time_spec = next_info[index].time;
curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
+ rx_metadata_t metadata = curr_info.metadata;
_props[index].handle_overflow();
+ curr_info.metadata = metadata;
UHD_MSG(fastpath) << "O";
}
return;
diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp
index 031d26374..52382f84d 100644
--- a/host/lib/transport/udp_wsa_zero_copy.cpp
+++ b/host/lib/transport/udp_wsa_zero_copy.cpp
@@ -276,6 +276,36 @@ public:
size_t get_num_send_frames(void) const {return _num_send_frames;}
size_t get_send_frame_size(void) const {return _send_frame_size;}
+ //! Read back the socket's buffer space reserved for receives
+ size_t get_recv_buff_size(void) {
+ int recv_buff_size = 0;
+ int opt_len = sizeof(recv_buff_size);
+ getsockopt(
+ _sock_fd,
+ SOL_SOCKET,
+ SO_RCVBUF,
+ (char *)&recv_buff_size,
+ (int *)&opt_len
+ );
+
+ return (size_t) recv_buff_size;
+ }
+
+ //! Read back the socket's buffer space reserved for sends
+ size_t get_send_buff_size(void) {
+ int send_buff_size = 0;
+ int opt_len = sizeof(send_buff_size);
+ getsockopt(
+ _sock_fd,
+ SOL_SOCKET,
+ SO_SNDBUF,
+ (char *)&send_buff_size,
+ (int *)&opt_len
+ );
+
+ return (size_t) send_buff_size;
+ }
+
private:
//memory management -> buffers and fifos
const size_t _recv_frame_size, _num_recv_frames;
@@ -292,6 +322,25 @@ private:
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
+void check_usr_buff_size(
+ size_t actual_buff_size,
+ size_t user_buff_size, // Set this to zero for no user-defined preference
+ const std::string tx_rx
+){
+ UHD_LOG << boost::format(
+ "Target %s sock buff size: %d bytes\n"
+ "Actual %s sock buff size: %d bytes"
+ ) % tx_rx % user_buff_size % tx_rx % actual_buff_size << std::endl;
+ if ((user_buff_size != 0.0) and (actual_buff_size < user_buff_size)) UHD_MSG(warning) << boost::format(
+ "The %s buffer could not be resized sufficiently.\n"
+ "Target sock buff size: %d bytes.\n"
+ "Actual sock buff size: %d bytes.\n"
+ "See the transport application notes on buffer resizing.\n"
+ ) % tx_rx % user_buff_size % actual_buff_size;
+}
+
+
+
udp_zero_copy::sptr udp_zero_copy::make(
const std::string &addr,
const std::string &port,
@@ -306,6 +355,34 @@ udp_zero_copy::sptr udp_zero_copy::make(
xport_params.num_recv_frames = size_t(hints.cast<double>("num_recv_frames", default_buff_args.num_recv_frames));
xport_params.send_frame_size = size_t(hints.cast<double>("send_frame_size", default_buff_args.send_frame_size));
xport_params.num_send_frames = size_t(hints.cast<double>("num_send_frames", default_buff_args.num_send_frames));
-
- return sptr(new udp_zero_copy_wsa_impl(addr, port, xport_params, hints));
+
+ //extract buffer size hints from the device addr and check if they match up
+ size_t usr_recv_buff_size = size_t(hints.cast<double>("recv_buff_size", 0.0));
+ size_t usr_send_buff_size = size_t(hints.cast<double>("send_buff_size", 0.0));
+ if (hints.has_key("recv_buff_size")) {
+ if (usr_recv_buff_size < xport_params.recv_frame_size * xport_params.num_recv_frames) {
+ throw uhd::value_error((boost::format(
+ "recv_buff_size must be equal to or greater than (num_recv_frames * recv_frame_size) where num_recv_frames=%d, recv_frame_size=%d")
+ % xport_params.num_recv_frames % xport_params.recv_frame_size).str());
+ }
+ }
+ if (hints.has_key("send_buff_size")) {
+ if (usr_send_buff_size < xport_params.send_frame_size * xport_params.num_send_frames) {
+ throw uhd::value_error((boost::format(
+ "send_buff_size must be equal to or greater than (num_send_frames * send_frame_size) where num_send_frames=%d, send_frame_size=%d")
+ % xport_params.num_send_frames % xport_params.send_frame_size).str());
+ }
+ }
+
+ udp_zero_copy_wsa_impl::sptr udp_trans(
+ new udp_zero_copy_wsa_impl(addr, port, xport_params, hints)
+ );
+
+ // Read back the actual socket buffer sizes
+ buff_params_out.recv_buff_size = udp_trans->get_recv_buff_size();
+ buff_params_out.send_buff_size = udp_trans->get_send_buff_size();
+ check_usr_buff_size(buff_params_out.recv_buff_size, usr_recv_buff_size, "recv");
+ check_usr_buff_size(buff_params_out.send_buff_size, usr_send_buff_size, "send");
+
+ return udp_trans;
}