diff options
-rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 68 |
1 files changed, 44 insertions, 24 deletions
diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index c1bfd1606..aa84dcab0 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -31,6 +31,8 @@ #include "../../rfnoc/tx_stream_terminator.hpp" #include <uhd/rfnoc/rate_node_ctrl.hpp> #include <uhd/rfnoc/radio_ctrl.hpp> +#include <uhd/transport/zero_copy_flow_ctrl.hpp> +#include <boost/atomic.hpp> #define UHD_STREAMER_LOG() UHD_LOGV(never) @@ -304,12 +306,13 @@ struct tx_fc_cache_t device_channel(0), last_seq_out(0), last_seq_ack(0), - seq_queue(1){} + last_seq_ack_cache(0) {} + size_t stream_channel; size_t device_channel; size_t last_seq_out; - size_t last_seq_ack; - uhd::transport::bounded_buffer<size_t> seq_queue; + boost::atomic_size_t last_seq_ack; + size_t last_seq_ack_cache; boost::shared_ptr<device3_impl::async_md_type> async_queue; boost::shared_ptr<device3_impl::async_md_type> old_async_queue; }; @@ -337,33 +340,43 @@ static size_t get_tx_flow_control_window( return window_in_pkts; } -static managed_send_buffer::sptr get_tx_buff_with_flowctrl( - task::sptr /*holds ref*/, - boost::shared_ptr<tx_fc_cache_t> fc_cache, +// TODO: Remove this function +// This function only exists to make sure the transport is not destroyed +// until it is no longer needed. +static managed_send_buffer::sptr get_tx_buff( zero_copy_if::sptr xport, - size_t fc_window, const double timeout ){ + return xport->get_send_buff(timeout); +} + +static bool tx_flow_ctrl( + task::sptr /*holds ref*/, + boost::shared_ptr<tx_fc_cache_t> fc_cache, + size_t fc_window, + managed_buffer::sptr +) { + // Busy loop waiting for flow control update. This is necessary because + // at this point there is data trying to be sent and it must be sent as + // quickly as possible when the flow control update arrives to avoid + // underruns at high rates. This is also OK because it only occurs when + // data needs to be sent and flow control is holding it back. while (true) { // delta is the amount of FC credit we've used up - const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - (fc_cache->last_seq_ack & HW_SEQ_NUM_MASK); + const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - + (fc_cache->last_seq_ack_cache & HW_SEQ_NUM_MASK); // If we want to send another packet, we must have FC credit left if ((delta & HW_SEQ_NUM_MASK) < fc_window) - break; - - // If credit is all used up, we check seq_queue for more. - const bool ok = fc_cache->seq_queue.pop_with_timed_wait(fc_cache->last_seq_ack, timeout); - if (not ok) { - return managed_send_buffer::sptr(); //timeout waiting for flow control + { + // Packet will be sent + fc_cache->last_seq_out++; //update seq + return true; } + // update the cached value from the atomic + fc_cache->last_seq_ack_cache = fc_cache->last_seq_ack; } - - managed_send_buffer::sptr buff = xport->get_send_buff(timeout); - if (buff) { - fc_cache->last_seq_out++; //update seq, this will actually be a send - } - return buff; + return false; } #define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 @@ -380,7 +393,9 @@ static void handle_tx_async_msgs( ) { managed_recv_buffer::sptr buff = xport->get_recv_buff(); if (not buff) + { return; + } //extract packet info vrt::if_packet_info_t if_packet_info; @@ -429,8 +444,7 @@ static void handle_tx_async_msgs( //The FC response and the burst ack are two indicators that the radio //consumed packets. Use them to update the FC metadata if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { - const size_t seq = metadata.user_payload[0]; - fc_cache->seq_queue.push_with_pop_on_full(seq); + fc_cache->last_seq_ack = metadata.user_payload[0]; } //FC responses don't propagate up to the user so filter them here @@ -841,13 +855,19 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) } } + // Add flow control + xport.send = zero_copy_flow_ctrl::make( + xport.send, + boost::bind(&tx_flow_ctrl, task, fc_cache, fc_window, _1), + NULL); + //Give the streamer a functor to get the send buffer - //get_tx_buff_with_flowctrl is static so bind has no lifetime issues + //get_tx_buff is static so bind has no lifetime issues //xport.send (sptr) is required to add streamer->data-transport lifetime dependency //task (sptr) is required to add a streamer->async-handler lifetime dependency my_streamer->set_xport_chan_get_buff( stream_i, - boost::bind(&get_tx_buff_with_flowctrl, task, fc_cache, xport.send, fc_window, _1) + boost::bind(&get_tx_buff, xport.send, _1) ); //Give the streamer a functor handled received async messages my_streamer->set_async_receiver( |