diff options
Diffstat (limited to 'host/lib/usrp/device3')
-rw-r--r-- | host/lib/usrp/device3/device3_impl.hpp | 1 | ||||
-rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 214 |
2 files changed, 111 insertions, 104 deletions
diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp index c2ec26f80..196d1fd4e 100644 --- a/host/lib/usrp/device3/device3_impl.hpp +++ b/host/lib/usrp/device3/device3_impl.hpp @@ -56,6 +56,7 @@ public: //! The purpose of a transport enum xport_type_t { CTRL = 0, + ASYNC_MSG, TX_DATA, RX_DATA }; diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index 1668846c2..374232972 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -299,23 +299,17 @@ static void handle_rx_flowctrl( /*********************************************************************** * TX Flow Control Functions **********************************************************************/ +#define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 + //! Stores the state of TX flow control struct tx_fc_cache_t { - tx_fc_cache_t(void): - stream_channel(0), - device_channel(0), - last_seq_out(0), + tx_fc_cache_t(size_t capacity): last_seq_ack(0), - last_seq_ack_cache(0) {} + space(capacity) {} - size_t stream_channel; - size_t device_channel; - size_t last_seq_out; - boost::atomic_size_t last_seq_ack; - size_t last_seq_ack_cache; - boost::shared_ptr<device3_impl::async_md_type> async_queue; - boost::shared_ptr<device3_impl::async_md_type> old_async_queue; + size_t last_seq_ack; + size_t space; }; /*! Return the size of the flow control window in packets. @@ -341,79 +335,77 @@ static size_t get_tx_flow_control_window( return window_in_pkts; } -// TODO: Remove this function -// This function only exists to make sure the transport is not destroyed -// until it is no longer needed. -static managed_send_buffer::sptr get_tx_buff( - zero_copy_if::sptr xport, - const double timeout -){ - return xport->get_send_buff(timeout); -} - static bool tx_flow_ctrl( - task::sptr /*holds ref*/, boost::shared_ptr<tx_fc_cache_t> fc_cache, - size_t fc_window, + zero_copy_if::sptr async_xport, + uint32_t (*endian_conv)(uint32_t), + void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), managed_buffer::sptr ) { - bool refresh_cache = false; - - // Busy loop waiting for flow control update. This is necessary because - // at this point there is data trying to be sent and it must be sent as - // quickly as possible when the flow control update arrives to avoid - // underruns at high rates. This is also OK because it only occurs when - // data needs to be sent and flow control is holding it back. while (true) { - if (refresh_cache) + // If there is space + if (fc_cache->space) { - // update the cached value from the atomic - fc_cache->last_seq_ack_cache = fc_cache->last_seq_ack; - } - - // delta is the amount of FC credit we've used up - const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - - (fc_cache->last_seq_ack_cache & HW_SEQ_NUM_MASK); - // If we want to send another packet, we must have FC credit left - if ((delta & HW_SEQ_NUM_MASK) < fc_window) - { - // Packet will be sent - fc_cache->last_seq_out++; //update seq + // All is good - packet will be sent + fc_cache->space--; return true; } - else + + // Look for a flow control message to update the space available in the buffer. + // A minimal timeout is used because larger timeouts can cause the thread to be + // scheduled out for too long at high data rates and result in underruns. + managed_recv_buffer::sptr buff = async_xport->get_recv_buff(0.000001); + if (buff) { - if (refresh_cache) + vrt::if_packet_info_t if_packet_info; + if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); + const uint32_t *packet_buff = buff->cast<const uint32_t *>(); + try { + unpack(packet_buff, if_packet_info); + } + catch(const std::exception &ex) { - // We have already refreshed the cache and still - // lack flow control permission to send new data. - - // A true busy loop choked out the message handler - // thread on machines with processor limitations - // (too few cores). Yield to allow flow control - // receiver thread to operate. - boost::this_thread::yield(); + UHD_LOG_ERROR("TX FLOW CTRL", "Error unpacking async flow control packet: " << ex.what()); + continue; } - else + + if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC) { - // Allow the cache to refresh and try again to - // see if the device has granted flow control permission. - refresh_cache = true; + UHD_LOG_ERROR( + "TX FLOW CTRL", + "Unexpected packet type received by flow control handler: " << if_packet_info.packet_type + ); + continue; } + + // update the amount of space + size_t seq_ack = endian_conv(packet_buff[if_packet_info.num_header_words32+1]); + fc_cache->space += (seq_ack - fc_cache->last_seq_ack) & HW_SEQ_NUM_MASK; + fc_cache->last_seq_ack = seq_ack; } } return false; } -#define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 -/*! Handle incoming messages. If they're flow control, update the TX FC cache. - * Otherwise, send them to the async message queue for the user to poll. +/*********************************************************************** + * TX Async Message Functions + **********************************************************************/ +struct async_tx_info_t +{ + size_t stream_channel; + size_t device_channel; + boost::shared_ptr<device3_impl::async_md_type> async_queue; + boost::shared_ptr<device3_impl::async_md_type> old_async_queue; +}; + +/*! Handle incoming messages. + * Send them to the async message queue for the user to poll. * * This is run inside a uhd::task as long as this streamer lives. */ static void handle_tx_async_msgs( - boost::shared_ptr<tx_fc_cache_t> fc_cache, + boost::shared_ptr<async_tx_info_t> async_info, zero_copy_if::sptr xport, endianness_t endianness, boost::function<double(void)> get_tick_rate @@ -463,31 +455,24 @@ static void handle_tx_async_msgs( if_packet_info, packet_buff, tick_rate, - fc_cache->stream_channel + async_info->stream_channel ); - // TODO: Shouldn't we be polling if_packet_info.packet_type == PACKET_TYPE_FC? - // Thing is, on X300, packet_type == 0, so that wouldn't work. But it seems it should. - //The FC response and the burst ack are two indicators that the radio - //consumed packets. Use them to update the FC metadata - if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { - fc_cache->last_seq_ack = metadata.user_payload[0]; - } - - //FC responses don't propagate up to the user so filter them here - if (metadata.event_code != DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { - fc_cache->async_queue->push_with_pop_on_full(metadata); - metadata.channel = fc_cache->device_channel; - fc_cache->old_async_queue->push_with_pop_on_full(metadata); + // Filter out any flow control messages and cache the rest + if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) + { + UHD_LOG_ERROR( + "TX ASYNC", + "Unexpected flow control message found in async message handling" + ); + } else { + async_info->async_queue->push_with_pop_on_full(metadata); + metadata.channel = async_info->device_channel; + async_info->old_async_queue->push_with_pop_on_full(metadata); standard_async_msg_prints(metadata); } } - - -/*********************************************************************** - * Async Data - **********************************************************************/ bool device3_impl::recv_async_msg( async_metadata_t &async_metadata, double timeout ) @@ -727,6 +712,20 @@ void device3_impl::update_tx_streamers(double /* rate */) } } +// This class manages the lifetime of the TX async message handler task and transports +class device3_send_packet_streamer : public sph::send_packet_streamer +{ +public: + device3_send_packet_streamer(const size_t max_num_samps) : sph::send_packet_streamer(max_num_samps) {}; + ~device3_send_packet_streamer() { + _tx_async_msg_task.reset(); // Make sure the async task is destroyed before the transports + }; + + both_xports_t _xport; + both_xports_t _async_xport; + task::sptr _tx_async_msg_task; +}; + tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) { boost::mutex::scoped_lock lock(_transport_setup_mutex); @@ -742,7 +741,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) boost::shared_ptr<async_md_type> async_md(new async_md_type(1000/*messages deep*/)); // II. Iterate over all channels - boost::shared_ptr<sph::send_packet_streamer> my_streamer; + boost::shared_ptr<device3_send_packet_streamer> my_streamer; // The terminator's lifetime is coupled to the streamer. // There is only one terminator. If the streamer has multiple channels, // it will be connected to each downstream block. @@ -780,7 +779,8 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) uhd::sid_t stream_address = blk_ctrl->get_address(block_port); UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ; both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints); - UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ; + both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t("")); + UHD_TX_STREAMER_LOG() << std::hex << "[TX Streamer] data_sid = " << xport.send_sid << std::dec << std::endl; // To calculate the max number of samples per packet, we assume the maximum header length // to avoid fragmentation should the entire header be used. @@ -791,8 +791,10 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) //make the new streamer given the samples per packet if (not my_streamer) - my_streamer = boost::make_shared<sph::send_packet_streamer>(spp); + my_streamer = boost::make_shared<device3_send_packet_streamer>(spp); my_streamer->resize(chan_list.size()); + my_streamer->_xport = xport; + my_streamer->_async_xport = async_xport; //init some streamer stuff std::string conv_endianness; @@ -828,29 +830,30 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) block_port ); - boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t()); - fc_cache->stream_channel = stream_i; - fc_cache->device_channel = mb_index; - fc_cache->async_queue = async_md; - fc_cache->old_async_queue = _async_md; + boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); + async_tx_info->stream_channel = args.channels[stream_i]; + async_tx_info->device_channel = mb_index; + async_tx_info->async_queue = async_md; + async_tx_info->old_async_queue = _async_md; boost::function<double(void)> tick_rate_retriever = boost::bind( &rfnoc::tick_node_ctrl::get_tick_rate, send_terminator, std::set< rfnoc::node_ctrl_base::sptr >() // Need to specify default args with bind ); - task::sptr task = task::make( + + my_streamer->_tx_async_msg_task = task::make( boost::bind( &handle_tx_async_msgs, - fc_cache, - xport.recv, + async_tx_info, + my_streamer->_async_xport.recv, xport.endianness, tick_rate_retriever ) ); blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0xc1ea12, block_port); - blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, xport.recv_sid.get_dst(), block_port); + blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port); UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ; // FIXME: Once there is a better way to map the radio block and port @@ -865,7 +868,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) { if (node->get_block_id() == radio_id) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, xport.send_sid.get_src(), radio_port); + node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), radio_port); } } } else { @@ -878,24 +881,27 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > downstream_radio_nodes = blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, xport.send_sid.get_src(), block_port); + node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port); } } // Add flow control - xport.send = zero_copy_flow_ctrl::make( - xport.send, - boost::bind(&tx_flow_ctrl, task, fc_cache, fc_window, _1), - 0 - ); + boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); + my_streamer->_xport.send = zero_copy_flow_ctrl::make( + my_streamer->_xport.send, + boost::bind( + &tx_flow_ctrl, + fc_cache, + my_streamer->_xport.recv, + (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>), + (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le), + _1), + NULL); //Give the streamer a functor to get the send buffer - //get_tx_buff is static so bind has no lifetime issues - //xport.send (sptr) is required to add streamer->data-transport lifetime dependency - //task (sptr) is required to add a streamer->async-handler lifetime dependency my_streamer->set_xport_chan_get_buff( stream_i, - boost::bind(&get_tx_buff, xport.send, _1) + boost::bind(&zero_copy_if::get_send_buff, my_streamer->_xport.send, _1) ); //Give the streamer a functor handled received async messages my_streamer->set_async_receiver( |