diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/nirio/rpc/rpc_client.cpp | 68 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 63 | ||||
-rw-r--r-- | host/lib/transport/udp_wsa_zero_copy.cpp | 81 |
3 files changed, 164 insertions, 48 deletions
diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp index a5f8cf412..f8dc26b50 100644 --- a/host/lib/transport/nirio/rpc/rpc_client.cpp +++ b/host/lib/transport/nirio/rpc/rpc_client.cpp @@ -17,7 +17,9 @@ #include <uhd/transport/nirio/rpc/rpc_client.hpp> #include <boost/bind.hpp> +#include <boost/version.hpp> #include <boost/format.hpp> +#include <boost/asio/error.hpp> #define CHAIN_BLOCKING_XFER(func, exp, status) \ if (status) { \ @@ -48,7 +50,23 @@ rpc_client::rpc_client ( tcp::resolver resolver(_io_service); tcp::resolver::query query(tcp::v4(), server, port); tcp::resolver::iterator iterator = resolver.resolve(query); - boost::asio::connect(_socket, iterator); + + #if BOOST_VERSION < 104700 + // default constructor creates end iterator + tcp::resolver::iterator end; + + boost::system::error_code error = boost::asio::error::host_not_found; + while (error && iterator != end) + { + _socket.close(); + _socket.connect(*iterator++, error); + } + if (error) + throw boost::system::system_error(error); + #else + boost::asio::connect(_socket, iterator); + #endif + UHD_LOG << "rpc_client connected to server." << std::endl; try { @@ -74,18 +92,18 @@ rpc_client::rpc_client ( _io_service_thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service))); } else { UHD_LOG << "rpc_client handshake failed." << std::endl; - _exec_err.assign(boost::asio::error::connection_refused, boost::system::system_category()); + _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category()); } UHD_LOG << boost::format("rpc_client archive = %d, rpc_server archive = %d\n.") % _hshake_args_client.boost_archive_version % _hshake_args_server.boost_archive_version; } catch (boost::exception&) { UHD_LOG << "rpc_client handshake aborted." << std::endl; - _exec_err.assign(boost::asio::error::connection_refused, boost::system::system_category()); + _exec_err.assign(boost::asio::error::connection_refused, boost::asio::error::get_system_category()); } } catch (boost::exception&) { UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl; - _exec_err.assign(boost::asio::error::connection_aborted, boost::system::system_category()); + _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); } } @@ -115,9 +133,12 @@ const boost::system::error_code& rpc_client::call( CHAIN_BLOCKING_XFER( boost::asio::write(_socket, boost::asio::buffer(&_request.header, sizeof(_request.header))), sizeof(_request.header), status); - CHAIN_BLOCKING_XFER( - boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())), - _request.data.size(), status); + if (not _request.data.empty()) + { + CHAIN_BLOCKING_XFER( + boost::asio::write(_socket, boost::asio::buffer(&(*_request.data.begin()), _request.data.size())), + _request.data.size(), status); + } } catch (boost::exception&) { status = false; } @@ -126,18 +147,18 @@ const boost::system::error_code& rpc_client::call( if (status) { if (!_exec_gate.timed_wait(lock, timeout)) { UHD_LOG << "rpc_client function timed out." << std::endl; - _exec_err.assign(boost::asio::error::timed_out, boost::system::system_category()); + _exec_err.assign(boost::asio::error::timed_out, boost::asio::error::get_system_category()); } } else { UHD_LOG << "rpc_client connection dropped." << std::endl; - _exec_err.assign(boost::asio::error::connection_aborted, boost::system::system_category()); + _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); _stop_io_service(); } //Verify that we are talking to the correct endpoint if ((_request.header.client_id != _response.header.client_id) && !_exec_err) { UHD_LOG << "rpc_client confused about who its talking to." << std::endl; - _exec_err.assign(boost::asio::error::operation_aborted, boost::system::system_category()); + _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category()); } if (!_exec_err) out_args.load(_response.data); @@ -153,19 +174,24 @@ void rpc_client::_handle_response_hdr(const boost::system::error_code& err, size if (!_exec_err && (transferred == expected)) { //Response header received. Verify that it is expected if (func_args_header_t::match_function(_request.header, _response.header)) { - _response.data.resize(_response.header.func_args_size); - - //Wait for response data - boost::asio::async_read(_socket, - boost::asio::buffer(&(*_response.data.begin()), _response.data.size()), - boost::bind(&rpc_client::_handle_response_data, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred, - _response.data.size())); + if (_response.header.func_args_size) + { + _response.data.resize(_response.header.func_args_size); + + //Wait for response data + boost::asio::async_read(_socket, + boost::asio::buffer(&(*_response.data.begin()), _response.data.size()), + boost::bind(&rpc_client::_handle_response_data, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred, + _response.data.size())); + } else { + _handle_response_data(err, 0, 0); + } } else { //Unexpected response. Ignore it. UHD_LOG << "rpc_client received garbage responses." << std::endl; - _exec_err.assign(boost::asio::error::operation_aborted, boost::system::system_category()); + _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category()); _wait_for_next_response_header(); } @@ -179,7 +205,7 @@ void rpc_client::_handle_response_data(const boost::system::error_code& err, siz boost::mutex::scoped_lock lock(_mutex); _exec_err = err; if (transferred != expected) { - _exec_err.assign(boost::asio::error::operation_aborted, boost::system::system_category()); + _exec_err.assign(boost::asio::error::operation_aborted, boost::asio::error::get_system_category()); } _exec_gate.notify_all(); diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 5fdf2594d..5c84327a4 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -154,28 +154,12 @@ public: /*! * Flush all transports in the streamer: - * This calls into get_and_process_single_packet(), - * so the sequence and flow control are handled. - * However, the packet payload is discarded. + * The packet payload is discarded. */ void flush_all(const double timeout = 0.0) { - increment_buffer_info(); //increment to next buffer - - for (size_t i = 0; i < _props.size(); i++) - { - while (true) //while (_props.at(i).get_buff(timeout)); - { - //receive a single packet from the transport - try - { - if (get_and_process_single_packet(i, - get_prev_buffer_info(), - get_curr_buffer_info(), - timeout) == PACKET_TIMEOUT_ERROR) break; - }catch(...){} - } - } + _flush_all(timeout); + return; } /*! @@ -379,12 +363,12 @@ private: ******************************************************************/ UHD_INLINE packet_type get_and_process_single_packet( const size_t index, - buffers_info_type &prev_buffer_info, - buffers_info_type &curr_buffer_info, + per_buffer_info_type &prev_buffer_info, + per_buffer_info_type &curr_buffer_info, double timeout ){ //get a single packet from the transport layer - managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff; + managed_recv_buffer::sptr &buff = curr_buffer_info.buff; buff = _props[index].get_buff(timeout); if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; @@ -405,7 +389,7 @@ private: } //extract packet info - per_buffer_info_type &info = curr_buffer_info[index]; + per_buffer_info_type &info = curr_buffer_info; info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32; _vrt_unpacker(info.vrt_hdr, info.ifpi); @@ -442,7 +426,7 @@ private: #endif //3) check for out of order timestamps - if (info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ + if (info.ifpi.has_tsf and prev_buffer_info.time > info.time){ return PACKET_TIMESTAMP_ERROR; } @@ -450,6 +434,33 @@ private: return PACKET_IF_DATA; } + void _flush_all(double timeout) + { + for (size_t i = 0; i < _props.size(); i++) + { + per_buffer_info_type prev_buffer_info, curr_buffer_info; + while (true) + { + //receive a single packet from the transport + try + { + // call into get_and_process_single_packet() + // to make sure flow control is handled + if (get_and_process_single_packet( + i, + prev_buffer_info, + curr_buffer_info, + timeout) == PACKET_TIMEOUT_ERROR) break; + } catch(...){} + prev_buffer_info = curr_buffer_info; + curr_buffer_info.reset(); + } + } + get_prev_buffer_info().reset(); + get_curr_buffer_info().reset(); + get_next_buffer_info().reset(); + } + /******************************************************************* * Alignment check: * Check the received packet for alignment and mark accordingly. @@ -509,7 +520,7 @@ private: //receive a single packet from the transport try{ packet = get_and_process_single_packet( - index, prev_info, curr_info, timeout + index, prev_info[index], curr_info[index], timeout ); } @@ -545,7 +556,9 @@ private: curr_info.metadata.time_spec = next_info[index].time; curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ + rx_metadata_t metadata = curr_info.metadata; _props[index].handle_overflow(); + curr_info.metadata = metadata; UHD_MSG(fastpath) << "O"; } return; diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp index 031d26374..52382f84d 100644 --- a/host/lib/transport/udp_wsa_zero_copy.cpp +++ b/host/lib/transport/udp_wsa_zero_copy.cpp @@ -276,6 +276,36 @@ public: size_t get_num_send_frames(void) const {return _num_send_frames;} size_t get_send_frame_size(void) const {return _send_frame_size;} + //! Read back the socket's buffer space reserved for receives + size_t get_recv_buff_size(void) { + int recv_buff_size = 0; + int opt_len = sizeof(recv_buff_size); + getsockopt( + _sock_fd, + SOL_SOCKET, + SO_RCVBUF, + (char *)&recv_buff_size, + (int *)&opt_len + ); + + return (size_t) recv_buff_size; + } + + //! Read back the socket's buffer space reserved for sends + size_t get_send_buff_size(void) { + int send_buff_size = 0; + int opt_len = sizeof(send_buff_size); + getsockopt( + _sock_fd, + SOL_SOCKET, + SO_SNDBUF, + (char *)&send_buff_size, + (int *)&opt_len + ); + + return (size_t) send_buff_size; + } + private: //memory management -> buffers and fifos const size_t _recv_frame_size, _num_recv_frames; @@ -292,6 +322,25 @@ private: /*********************************************************************** * UDP zero copy make function **********************************************************************/ +void check_usr_buff_size( + size_t actual_buff_size, + size_t user_buff_size, // Set this to zero for no user-defined preference + const std::string tx_rx +){ + UHD_LOG << boost::format( + "Target %s sock buff size: %d bytes\n" + "Actual %s sock buff size: %d bytes" + ) % tx_rx % user_buff_size % tx_rx % actual_buff_size << std::endl; + if ((user_buff_size != 0.0) and (actual_buff_size < user_buff_size)) UHD_MSG(warning) << boost::format( + "The %s buffer could not be resized sufficiently.\n" + "Target sock buff size: %d bytes.\n" + "Actual sock buff size: %d bytes.\n" + "See the transport application notes on buffer resizing.\n" + ) % tx_rx % user_buff_size % actual_buff_size; +} + + + udp_zero_copy::sptr udp_zero_copy::make( const std::string &addr, const std::string &port, @@ -306,6 +355,34 @@ udp_zero_copy::sptr udp_zero_copy::make( xport_params.num_recv_frames = size_t(hints.cast<double>("num_recv_frames", default_buff_args.num_recv_frames)); xport_params.send_frame_size = size_t(hints.cast<double>("send_frame_size", default_buff_args.send_frame_size)); xport_params.num_send_frames = size_t(hints.cast<double>("num_send_frames", default_buff_args.num_send_frames)); - - return sptr(new udp_zero_copy_wsa_impl(addr, port, xport_params, hints)); + + //extract buffer size hints from the device addr and check if they match up + size_t usr_recv_buff_size = size_t(hints.cast<double>("recv_buff_size", 0.0)); + size_t usr_send_buff_size = size_t(hints.cast<double>("send_buff_size", 0.0)); + if (hints.has_key("recv_buff_size")) { + if (usr_recv_buff_size < xport_params.recv_frame_size * xport_params.num_recv_frames) { + throw uhd::value_error((boost::format( + "recv_buff_size must be equal to or greater than (num_recv_frames * recv_frame_size) where num_recv_frames=%d, recv_frame_size=%d") + % xport_params.num_recv_frames % xport_params.recv_frame_size).str()); + } + } + if (hints.has_key("send_buff_size")) { + if (usr_send_buff_size < xport_params.send_frame_size * xport_params.num_send_frames) { + throw uhd::value_error((boost::format( + "send_buff_size must be equal to or greater than (num_send_frames * send_frame_size) where num_send_frames=%d, send_frame_size=%d") + % xport_params.num_send_frames % xport_params.send_frame_size).str()); + } + } + + udp_zero_copy_wsa_impl::sptr udp_trans( + new udp_zero_copy_wsa_impl(addr, port, xport_params, hints) + ); + + // Read back the actual socket buffer sizes + buff_params_out.recv_buff_size = udp_trans->get_recv_buff_size(); + buff_params_out.send_buff_size = udp_trans->get_send_buff_size(); + check_usr_buff_size(buff_params_out.recv_buff_size, usr_recv_buff_size, "recv"); + check_usr_buff_size(buff_params_out.send_buff_size, usr_send_buff_size, "send"); + + return udp_trans; } |