aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp/device3
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/usrp/device3')
-rw-r--r--host/lib/usrp/device3/device3_impl.hpp1
-rw-r--r--host/lib/usrp/device3/device3_io_impl.cpp214
2 files changed, 111 insertions, 104 deletions
diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp
index c2ec26f80..196d1fd4e 100644
--- a/host/lib/usrp/device3/device3_impl.hpp
+++ b/host/lib/usrp/device3/device3_impl.hpp
@@ -56,6 +56,7 @@ public:
//! The purpose of a transport
enum xport_type_t {
CTRL = 0,
+ ASYNC_MSG,
TX_DATA,
RX_DATA
};
diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp
index 1668846c2..374232972 100644
--- a/host/lib/usrp/device3/device3_io_impl.cpp
+++ b/host/lib/usrp/device3/device3_io_impl.cpp
@@ -299,23 +299,17 @@ static void handle_rx_flowctrl(
/***********************************************************************
* TX Flow Control Functions
**********************************************************************/
+#define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0
+
//! Stores the state of TX flow control
struct tx_fc_cache_t
{
- tx_fc_cache_t(void):
- stream_channel(0),
- device_channel(0),
- last_seq_out(0),
+ tx_fc_cache_t(size_t capacity):
last_seq_ack(0),
- last_seq_ack_cache(0) {}
+ space(capacity) {}
- size_t stream_channel;
- size_t device_channel;
- size_t last_seq_out;
- boost::atomic_size_t last_seq_ack;
- size_t last_seq_ack_cache;
- boost::shared_ptr<device3_impl::async_md_type> async_queue;
- boost::shared_ptr<device3_impl::async_md_type> old_async_queue;
+ size_t last_seq_ack;
+ size_t space;
};
/*! Return the size of the flow control window in packets.
@@ -341,79 +335,77 @@ static size_t get_tx_flow_control_window(
return window_in_pkts;
}
-// TODO: Remove this function
-// This function only exists to make sure the transport is not destroyed
-// until it is no longer needed.
-static managed_send_buffer::sptr get_tx_buff(
- zero_copy_if::sptr xport,
- const double timeout
-){
- return xport->get_send_buff(timeout);
-}
-
static bool tx_flow_ctrl(
- task::sptr /*holds ref*/,
boost::shared_ptr<tx_fc_cache_t> fc_cache,
- size_t fc_window,
+ zero_copy_if::sptr async_xport,
+ uint32_t (*endian_conv)(uint32_t),
+ void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &),
managed_buffer::sptr
) {
- bool refresh_cache = false;
-
- // Busy loop waiting for flow control update. This is necessary because
- // at this point there is data trying to be sent and it must be sent as
- // quickly as possible when the flow control update arrives to avoid
- // underruns at high rates. This is also OK because it only occurs when
- // data needs to be sent and flow control is holding it back.
while (true)
{
- if (refresh_cache)
+ // If there is space
+ if (fc_cache->space)
{
- // update the cached value from the atomic
- fc_cache->last_seq_ack_cache = fc_cache->last_seq_ack;
- }
-
- // delta is the amount of FC credit we've used up
- const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) -
- (fc_cache->last_seq_ack_cache & HW_SEQ_NUM_MASK);
- // If we want to send another packet, we must have FC credit left
- if ((delta & HW_SEQ_NUM_MASK) < fc_window)
- {
- // Packet will be sent
- fc_cache->last_seq_out++; //update seq
+ // All is good - packet will be sent
+ fc_cache->space--;
return true;
}
- else
+
+ // Look for a flow control message to update the space available in the buffer.
+ // A minimal timeout is used because larger timeouts can cause the thread to be
+ // scheduled out for too long at high data rates and result in underruns.
+ managed_recv_buffer::sptr buff = async_xport->get_recv_buff(0.000001);
+ if (buff)
{
- if (refresh_cache)
+ vrt::if_packet_info_t if_packet_info;
+ if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
+ const uint32_t *packet_buff = buff->cast<const uint32_t *>();
+ try {
+ unpack(packet_buff, if_packet_info);
+ }
+ catch(const std::exception &ex)
{
- // We have already refreshed the cache and still
- // lack flow control permission to send new data.
-
- // A true busy loop choked out the message handler
- // thread on machines with processor limitations
- // (too few cores). Yield to allow flow control
- // receiver thread to operate.
- boost::this_thread::yield();
+ UHD_LOG_ERROR("TX FLOW CTRL", "Error unpacking async flow control packet: " << ex.what());
+ continue;
}
- else
+
+ if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC)
{
- // Allow the cache to refresh and try again to
- // see if the device has granted flow control permission.
- refresh_cache = true;
+ UHD_LOG_ERROR(
+ "TX FLOW CTRL",
+ "Unexpected packet type received by flow control handler: " << if_packet_info.packet_type
+ );
+ continue;
}
+
+ // update the amount of space
+ size_t seq_ack = endian_conv(packet_buff[if_packet_info.num_header_words32+1]);
+ fc_cache->space += (seq_ack - fc_cache->last_seq_ack) & HW_SEQ_NUM_MASK;
+ fc_cache->last_seq_ack = seq_ack;
}
}
return false;
}
-#define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0
-/*! Handle incoming messages. If they're flow control, update the TX FC cache.
- * Otherwise, send them to the async message queue for the user to poll.
+/***********************************************************************
+ * TX Async Message Functions
+ **********************************************************************/
+struct async_tx_info_t
+{
+ size_t stream_channel;
+ size_t device_channel;
+ boost::shared_ptr<device3_impl::async_md_type> async_queue;
+ boost::shared_ptr<device3_impl::async_md_type> old_async_queue;
+};
+
+/*! Handle incoming messages.
+ * Send them to the async message queue for the user to poll.
*
* This is run inside a uhd::task as long as this streamer lives.
*/
static void handle_tx_async_msgs(
- boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ boost::shared_ptr<async_tx_info_t> async_info,
zero_copy_if::sptr xport,
endianness_t endianness,
boost::function<double(void)> get_tick_rate
@@ -463,31 +455,24 @@ static void handle_tx_async_msgs(
if_packet_info,
packet_buff,
tick_rate,
- fc_cache->stream_channel
+ async_info->stream_channel
);
- // TODO: Shouldn't we be polling if_packet_info.packet_type == PACKET_TYPE_FC?
- // Thing is, on X300, packet_type == 0, so that wouldn't work. But it seems it should.
- //The FC response and the burst ack are two indicators that the radio
- //consumed packets. Use them to update the FC metadata
- if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) {
- fc_cache->last_seq_ack = metadata.user_payload[0];
- }
-
- //FC responses don't propagate up to the user so filter them here
- if (metadata.event_code != DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) {
- fc_cache->async_queue->push_with_pop_on_full(metadata);
- metadata.channel = fc_cache->device_channel;
- fc_cache->old_async_queue->push_with_pop_on_full(metadata);
+ // Filter out any flow control messages and cache the rest
+ if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL)
+ {
+ UHD_LOG_ERROR(
+ "TX ASYNC",
+ "Unexpected flow control message found in async message handling"
+ );
+ } else {
+ async_info->async_queue->push_with_pop_on_full(metadata);
+ metadata.channel = async_info->device_channel;
+ async_info->old_async_queue->push_with_pop_on_full(metadata);
standard_async_msg_prints(metadata);
}
}
-
-
-/***********************************************************************
- * Async Data
- **********************************************************************/
bool device3_impl::recv_async_msg(
async_metadata_t &async_metadata, double timeout
)
@@ -727,6 +712,20 @@ void device3_impl::update_tx_streamers(double /* rate */)
}
}
+// This class manages the lifetime of the TX async message handler task and transports
+class device3_send_packet_streamer : public sph::send_packet_streamer
+{
+public:
+ device3_send_packet_streamer(const size_t max_num_samps) : sph::send_packet_streamer(max_num_samps) {};
+ ~device3_send_packet_streamer() {
+ _tx_async_msg_task.reset(); // Make sure the async task is destroyed before the transports
+ };
+
+ both_xports_t _xport;
+ both_xports_t _async_xport;
+ task::sptr _tx_async_msg_task;
+};
+
tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
{
boost::mutex::scoped_lock lock(_transport_setup_mutex);
@@ -742,7 +741,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
boost::shared_ptr<async_md_type> async_md(new async_md_type(1000/*messages deep*/));
// II. Iterate over all channels
- boost::shared_ptr<sph::send_packet_streamer> my_streamer;
+ boost::shared_ptr<device3_send_packet_streamer> my_streamer;
// The terminator's lifetime is coupled to the streamer.
// There is only one terminator. If the streamer has multiple channels,
// it will be connected to each downstream block.
@@ -780,7 +779,8 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
uhd::sid_t stream_address = blk_ctrl->get_address(block_port);
UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ;
both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints);
- UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ;
+ both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t(""));
+ UHD_TX_STREAMER_LOG() << std::hex << "[TX Streamer] data_sid = " << xport.send_sid << std::dec << std::endl;
// To calculate the max number of samples per packet, we assume the maximum header length
// to avoid fragmentation should the entire header be used.
@@ -791,8 +791,10 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
//make the new streamer given the samples per packet
if (not my_streamer)
- my_streamer = boost::make_shared<sph::send_packet_streamer>(spp);
+ my_streamer = boost::make_shared<device3_send_packet_streamer>(spp);
my_streamer->resize(chan_list.size());
+ my_streamer->_xport = xport;
+ my_streamer->_async_xport = async_xport;
//init some streamer stuff
std::string conv_endianness;
@@ -828,29 +830,30 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
block_port
);
- boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t());
- fc_cache->stream_channel = stream_i;
- fc_cache->device_channel = mb_index;
- fc_cache->async_queue = async_md;
- fc_cache->old_async_queue = _async_md;
+ boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t());
+ async_tx_info->stream_channel = args.channels[stream_i];
+ async_tx_info->device_channel = mb_index;
+ async_tx_info->async_queue = async_md;
+ async_tx_info->old_async_queue = _async_md;
boost::function<double(void)> tick_rate_retriever = boost::bind(
&rfnoc::tick_node_ctrl::get_tick_rate,
send_terminator,
std::set< rfnoc::node_ctrl_base::sptr >() // Need to specify default args with bind
);
- task::sptr task = task::make(
+
+ my_streamer->_tx_async_msg_task = task::make(
boost::bind(
&handle_tx_async_msgs,
- fc_cache,
- xport.recv,
+ async_tx_info,
+ my_streamer->_async_xport.recv,
xport.endianness,
tick_rate_retriever
)
);
blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0xc1ea12, block_port);
- blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, xport.recv_sid.get_dst(), block_port);
+ blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port);
UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ;
// FIXME: Once there is a better way to map the radio block and port
@@ -865,7 +868,7 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size();
for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) {
if (node->get_block_id() == radio_id) {
- node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, xport.send_sid.get_src(), radio_port);
+ node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), radio_port);
}
}
} else {
@@ -878,24 +881,27 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > downstream_radio_nodes = blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>();
UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size();
for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node: downstream_radio_nodes) {
- node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, xport.send_sid.get_src(), block_port);
+ node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, my_streamer->_async_xport.recv_sid.get_dst(), block_port);
}
}
// Add flow control
- xport.send = zero_copy_flow_ctrl::make(
- xport.send,
- boost::bind(&tx_flow_ctrl, task, fc_cache, fc_window, _1),
- 0
- );
+ boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window));
+ my_streamer->_xport.send = zero_copy_flow_ctrl::make(
+ my_streamer->_xport.send,
+ boost::bind(
+ &tx_flow_ctrl,
+ fc_cache,
+ my_streamer->_xport.recv,
+ (xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>),
+ (xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le),
+ _1),
+ NULL);
//Give the streamer a functor to get the send buffer
- //get_tx_buff is static so bind has no lifetime issues
- //xport.send (sptr) is required to add streamer->data-transport lifetime dependency
- //task (sptr) is required to add a streamer->async-handler lifetime dependency
my_streamer->set_xport_chan_get_buff(
stream_i,
- boost::bind(&get_tx_buff, xport.send, _1)
+ boost::bind(&zero_copy_if::get_send_buff, my_streamer->_xport.send, _1)
);
//Give the streamer a functor handled received async messages
my_streamer->set_async_receiver(