diff options
Diffstat (limited to 'host/lib/usrp/device3')
-rw-r--r-- | host/lib/usrp/device3/device3_impl.hpp | 18 | ||||
-rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 163 |
2 files changed, 92 insertions, 89 deletions
diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp index 580268e4a..11487b54c 100644 --- a/host/lib/usrp/device3/device3_impl.hpp +++ b/host/lib/usrp/device3/device3_impl.hpp @@ -56,13 +56,13 @@ public: _terminator(terminator), _data_xport(data_xport), _async_msg_xport(async_msg_xport) - {}; + {} ~device3_send_packet_streamer() { // Make sure the async task is destroyed before the transports _tx_async_msg_tasks.clear(); - }; + } uhd::rfnoc::tx_stream_terminator::sptr get_terminator() { @@ -92,9 +92,9 @@ public: ) : uhd::transport::sph::recv_packet_streamer(max_num_samps), _terminator(terminator), - _xport(xport) {}; + _xport(xport) {} - ~device3_recv_packet_streamer() {}; + ~device3_recv_packet_streamer() {} both_xports_t get_xport() { @@ -145,7 +145,7 @@ public: , rx_max_len_hdr(DEVICE3_RX_MAX_HDR_LEN) , rx_fc_request_freq(DEVICE3_RX_FC_REQUEST_FREQ) , tx_fc_response_freq(DEVICE3_TX_FC_RESPONSE_FREQ) - {}; + {} }; /*********************************************************************** @@ -165,7 +165,7 @@ protected: * Structors **********************************************************************/ device3_impl(); - virtual ~device3_impl() {}; + virtual ~device3_impl() {} /*********************************************************************** * Streaming-related @@ -196,11 +196,11 @@ protected: const uhd::device_addr_t& args ) = 0; - virtual uhd::device_addr_t get_tx_hints(size_t) { return uhd::device_addr_t(); }; - virtual uhd::device_addr_t get_rx_hints(size_t) { return uhd::device_addr_t(); }; + virtual uhd::device_addr_t get_tx_hints(size_t) { return uhd::device_addr_t(); } + virtual uhd::device_addr_t get_rx_hints(size_t) { return uhd::device_addr_t(); } //! Is called after a streamer is generated - virtual void post_streamer_hooks(uhd::direction_t) {}; + virtual void post_streamer_hooks(uhd::direction_t) {} /*********************************************************************** * Channel-related diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index e4c38aca6..490f10c3c 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -158,7 +158,10 @@ struct rx_fc_cache_t uint64_t seq_num; sid_t sid; zero_copy_if::sptr xport; - endianness_t endianness; + std::function<uint32_t(uint32_t)> to_host; + std::function<uint32_t(uint32_t)> from_host; + std::function<void(const uint32_t *packet_buff, vrt::if_packet_info_t &)> unpack; + std::function<void(uint32_t *packet_buff, vrt::if_packet_info_t &)> pack; }; /*! Determine the size of the flow control window in number of packets. @@ -218,8 +221,8 @@ static size_t get_rx_flow_control_window( * skip the counter update. */ static bool rx_flow_ctrl( - boost::shared_ptr<rx_fc_cache_t> fc_cache, - managed_buffer::sptr buff + boost::shared_ptr<rx_fc_cache_t> fc_cache, + managed_buffer::sptr buff ) { // If the caller supplied a buffer if (buff) @@ -229,17 +232,12 @@ static bool rx_flow_ctrl( packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); const uint32_t *pkt = buff->cast<const uint32_t *>(); try { - if (fc_cache->endianness == ENDIANNESS_BIG) - { - vrt::chdr::if_hdr_unpack_be(pkt, packet_info); - } else { - vrt::chdr::if_hdr_unpack_le(pkt, packet_info); - } + fc_cache->unpack(pkt, packet_info); } catch(const std::exception &ex) { // Log and ignore - UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; + UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl; return true; } @@ -282,23 +280,13 @@ static bool rx_flow_ctrl( packet_info.has_tsf = false; packet_info.has_tlr = false; - if (fc_cache->endianness == ENDIANNESS_BIG) { - // Load Header: - vrt::chdr::if_hdr_pack_be(pkt, packet_info); - // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = - uhd::htonx<uint32_t>(fc_cache->total_packets_consumed); - pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = - uhd::htonx<uint32_t>(fc_cache->total_bytes_consumed); - } else { - // Load Header: - vrt::chdr::if_hdr_pack_le(pkt, packet_info); - // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = - uhd::htowx<uint32_t>(fc_cache->total_packets_consumed); - pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = - uhd::htowx<uint32_t>(fc_cache->total_bytes_consumed); - } + // Load Header: + fc_cache->pack(pkt, packet_info); + // Load Payload: Packet count, and byte count + pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = + fc_cache->from_host(fc_cache->total_packets_consumed); + pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = + fc_cache->from_host(fc_cache->total_bytes_consumed); //send the buffer over the interface fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); @@ -313,15 +301,11 @@ static bool rx_flow_ctrl( * */ static void handle_rx_flowctrl_ack( - boost::shared_ptr<rx_fc_cache_t> fc_cache, - const uint32_t *payload + boost::shared_ptr<rx_fc_cache_t> fc_cache, + const uint32_t *payload ) { - const uint32_t pkt_count = (fc_cache->endianness == ENDIANNESS_BIG) ? - uhd::ntohx<uint32_t>(payload[0]) : - uhd::wtohx<uint32_t>(payload[0]); - const uint32_t byte_count = (fc_cache->endianness == ENDIANNESS_BIG) ? - uhd::ntohx<uint32_t>(payload[1]) : - uhd::wtohx<uint32_t>(payload[1]); + const uint32_t pkt_count = fc_cache->to_host(payload[0]); + const uint32_t byte_count = fc_cache->to_host(payload[1]); if (fc_cache->total_bytes_consumed != byte_count) { UHD_LOGGER_DEBUG("device3") @@ -362,13 +346,15 @@ struct tx_fc_cache_t uint32_t window_size; uint32_t fc_ack_seqnum; bool fc_received; + std::function<uint32_t(uint32_t)> to_host; + std::function<uint32_t(uint32_t)> from_host; + std::function<void(const uint32_t *packet_buff, vrt::if_packet_info_t &)> unpack; + std::function<void(uint32_t *packet_buff, vrt::if_packet_info_t &)> pack; }; static bool tx_flow_ctrl( boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr xport, - uint32_t (*to_host)(uint32_t), - void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), + zero_copy_if::sptr xport, managed_buffer::sptr buff ) { while (true) @@ -395,7 +381,7 @@ static bool tx_flow_ctrl( if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); const uint32_t *packet_buff = buff->cast<const uint32_t *>(); try { - unpack(packet_buff, if_packet_info); + fc_cache->unpack(packet_buff, if_packet_info); } catch(const std::exception &ex) { @@ -410,8 +396,8 @@ static bool tx_flow_ctrl( } const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; - const uint32_t pkt_count = to_host(payload[0]); - const uint32_t byte_count = to_host(payload[1]); + const uint32_t pkt_count = fc_cache->to_host(payload[0]); + const uint32_t byte_count = fc_cache->to_host(payload[1]); // update the amount of space fc_cache->last_byte_ack = byte_count; @@ -426,9 +412,7 @@ static bool tx_flow_ctrl( static void tx_flow_ctrl_ack( boost::shared_ptr<tx_fc_cache_t> fc_cache, zero_copy_if::sptr send_xport, - sid_t send_sid, - uint32_t (*from_host)(uint32_t), - void (*pack)(uint32_t *packet_buff, vrt::if_packet_info_t &) + sid_t send_sid ) { if (not fc_cache->fc_received) { @@ -462,7 +446,7 @@ static void tx_flow_ctrl_ack( packet_info.has_tlr = false; // Load Header: - pack(pkt, packet_info); + fc_cache->pack(pkt, packet_info); // Update counters to include this packet size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); @@ -476,9 +460,9 @@ static void tx_flow_ctrl_ack( // Load Payload: Packet count, and byte count pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = - from_host(fc_cache->pkt_count); + fc_cache->from_host(fc_cache->pkt_count); pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = - from_host(fc_cache->byte_count); + fc_cache->from_host(fc_cache->byte_count); // Send the buffer over the interface fc_buff->commit(fc_ack_pkt_size); @@ -506,7 +490,8 @@ struct async_tx_info_t static void handle_tx_async_msgs( boost::shared_ptr<async_tx_info_t> async_info, zero_copy_if::sptr xport, - endianness_t endianness, + uint32_t (*to_host)(uint32_t), + void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), boost::function<double(void)> get_tick_rate ) { managed_recv_buffer::sptr buff = xport->get_recv_buff(); @@ -521,19 +506,9 @@ static void handle_tx_async_msgs( const uint32_t *packet_buff = buff->cast<const uint32_t *>(); //unpacking can fail - uint32_t (*endian_conv)(uint32_t) = uhd::ntohx; try { - if (endianness == ENDIANNESS_BIG) - { - vrt::chdr::if_hdr_unpack_be(packet_buff, if_packet_info); - endian_conv = uhd::ntohx; - } - else - { - vrt::chdr::if_hdr_unpack_le(packet_buff, if_packet_info); - endian_conv = uhd::wtohx; - } + unpack(packet_buff, if_packet_info); } catch(const std::exception &ex) { @@ -549,7 +524,7 @@ static void handle_tx_async_msgs( //fill in the async metadata async_metadata_t metadata; load_metadata_from_buff( - endian_conv, + to_host, metadata, if_packet_info, packet_buff, @@ -681,14 +656,29 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); fc_cache->sid = xport.send_sid; fc_cache->xport = xport.send; - fc_cache->endianness = xport.endianness; fc_cache->interval = fc_handle_window; + if (xport.endianness == ENDIANNESS_BIG) + { + fc_cache->to_host = uhd::ntohx<uint32_t>; + fc_cache->from_host = uhd::htonx<uint32_t>; + fc_cache->pack = vrt::chdr::if_hdr_pack_be; + fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; + } + else + { + fc_cache->to_host = uhd::wtohx<uint32_t>; + fc_cache->from_host = uhd::htowx<uint32_t>; + fc_cache->pack = vrt::chdr::if_hdr_pack_le; + fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; + } xport.recv = zero_copy_flow_ctrl::make ( xport.recv, NULL, - [=](managed_buffer::sptr buff) { - return rx_flow_ctrl(fc_cache, buff); + [fc_cache](managed_buffer::sptr buff) { + return rx_flow_ctrl( + fc_cache, + buff); } ); @@ -753,7 +743,7 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) // Give the streamer a functor to handle flow control ACK messages my_streamer->set_xport_handle_flowctrl_ack( stream_i, - [=](const uint32_t *payload) { + [fc_cache](const uint32_t *payload) { handle_rx_flowctrl_ack( fc_cache, payload @@ -764,7 +754,9 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) //Give the streamer a functor to get the recv_buffer my_streamer->set_xport_chan_get_buff( stream_i, - [=](double timeout) {return xport.recv->get_recv_buff(timeout);}, + [xport](double timeout) { + return xport.recv->get_recv_buff(timeout); + }, true /*flush*/ ); @@ -774,7 +766,7 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) boost::weak_ptr<uhd::rx_streamer> weak_ptr(my_streamer); my_streamer->set_overflow_handler( stream_i, - [=]() { + [recv_terminator, weak_ptr, stream_i]() { recv_terminator->handle_overrun( weak_ptr, stream_i); @@ -784,7 +776,9 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) //Give the streamer a functor issue stream cmd my_streamer->set_issue_stream_cmd( stream_i, - [=](const stream_cmd_t& stream_cmd) {blk_ctrl->issue_stream_cmd(stream_cmd, block_port);} + [blk_ctrl, block_port](const stream_cmd_t& stream_cmd) { + blk_ctrl->issue_stream_cmd(stream_cmd, block_port); + } ); } @@ -913,14 +907,24 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) ); // Add flow control transport boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); + if (xport.endianness == ENDIANNESS_BIG) + { + fc_cache->to_host = uhd::ntohx<uint32_t>; + fc_cache->from_host = uhd::htonx<uint32_t>; + fc_cache->pack = vrt::chdr::if_hdr_pack_be; + fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; + } else { + fc_cache->to_host = uhd::wtohx<uint32_t>; + fc_cache->from_host = uhd::htowx<uint32_t>; + fc_cache->pack = vrt::chdr::if_hdr_pack_le; + fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; + } xport.send = zero_copy_flow_ctrl::make( xport.send, - [=](managed_buffer::sptr buff) { + [fc_cache, xport](managed_buffer::sptr buff) { return tx_flow_ctrl( fc_cache, xport.recv, - (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>), - (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le), buff); }, NULL @@ -1004,12 +1008,13 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) async_tx_info->old_async_queue = _async_md; task::sptr async_task = task::make( - [=]() { + [async_tx_info, async_xport, xport, send_terminator]() { handle_tx_async_msgs( async_tx_info, async_xport.recv, - xport.endianness, - [=]() {return send_terminator->get_tick_rate();} + xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>, + xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le, + [send_terminator]() {return send_terminator->get_tick_rate();} ); } ); @@ -1018,13 +1023,13 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) //Give the streamer a functor to get the send buffer my_streamer->set_xport_chan_get_buff( stream_i, - [=](const double timeout) { + [xport](const double timeout) { return xport.send->get_send_buff(timeout); } ); //Give the streamer a functor handled received async messages my_streamer->set_async_receiver( - [=](uhd::async_metadata_t& md, const double timeout) { + [async_md](uhd::async_metadata_t& md, const double timeout) { return async_md->pop_with_timed_wait(md, timeout); } ); @@ -1034,15 +1039,13 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) my_streamer->set_xport_chan_post_send_cb( stream_i, - [=]() { + [fc_cache, xport]() { tx_flow_ctrl_ack( fc_cache, xport.send, - xport.send_sid, - (xport.endianness == ENDIANNESS_BIG ? uhd::htonx<uint32_t> : uhd::htowx<uint32_t>), - (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_pack_be : vrt::chdr::if_hdr_pack_le) + xport.send_sid ); - } + } ); } |