aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-08-13 14:55:36 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:41 -0800
commitf5e726b0ad83743c173231d9ac019c917a51be07 (patch)
tree0ba29a81ffef116624492b0fe157a7c00a00ca65 /host/lib
parentc55c434425b9d585c5a76a1769206cccd55d9bc4 (diff)
downloaduhd-f5e726b0ad83743c173231d9ac019c917a51be07.tar.gz
uhd-f5e726b0ad83743c173231d9ac019c917a51be07.tar.bz2
uhd-f5e726b0ad83743c173231d9ac019c917a51be07.zip
rfnoc: Implement flushing on overrun
This modifies the overrun handling such that the RX streamer does not restart the radios until the packets that were buffered prior to the overrun are read by the user. When an RX streamer receives an overrun, it will run the following algorithm: 1. Stop all upstream producers. 2. Set an internal flag in the streamer that indicates that the producers have stopped due to an overrun. 3. Continue servicing calls to recv until it runs out of packets in the host buffer (packets that can be read from the transport using a 0 timeout). 4. Once the packets are exhausted, return an overrun error from recv. The radio, if it was in continuous streaming mode before the overrun, includes a flag in its initial action whether or not to restart streaming. 5. If the radio requested a restart, the streamer submits a restart request action upstream. This action will be received by the radio. The radio will then check the current time, and send a stream command action back downstream. 6. The RX streamer receives the stream command action, and uses it to send another stream command to all upstream producers. This way, all upstream producers receive a start command for the same time.
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp1
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp8
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp13
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp137
-rw-r--r--host/lib/rfnoc/radio_control_impl.cpp22
-rw-r--r--host/lib/rfnoc/rfnoc_rx_streamer.cpp49
6 files changed, 158 insertions, 72 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
index 5440c1e37..5327105c8 100644
--- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
+++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
@@ -324,7 +324,6 @@ private:
std::unordered_map<size_t, double> _rx_bandwidth;
std::vector<uhd::stream_cmd_t> _last_stream_cmd;
- std::vector<bool> _restart_cont;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
index 1d1d0805d..d39d88f43 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -10,8 +10,8 @@
#include <uhd/rfnoc/node.hpp>
#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <uhdlib/transport/rx_streamer_impl.hpp>
-#include <string>
#include <atomic>
+#include <string>
namespace uhd { namespace rfnoc {
@@ -75,16 +75,17 @@ public:
*/
bool check_topology(const std::vector<size_t>& connected_inputs,
const std::vector<size_t>& connected_outputs);
+
private:
void _register_props(const size_t chan, const std::string& otw_format);
void _handle_rx_event_action(
const res_source_info& src, rx_event_action_info::sptr rx_event_action);
- void _handle_restart_request(
- const res_source_info& src, action_info::sptr rx_event_action);
void _handle_stream_cmd_action(
const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action);
+ void _handle_overrun();
+
// Properties
std::vector<property_t<double>> _scaling_in;
std::vector<property_t<double>> _samp_rate_in;
@@ -98,6 +99,7 @@ private:
const uhd::stream_args_t _stream_args;
std::atomic<bool> _overrun_handling_mode{false};
+ size_t _overrun_channel = 0;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
index d3fe97c7f..cc989e8f2 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -196,6 +196,19 @@ protected:
_zero_copy_streamer.set_tick_rate(rate);
}
+ //! Notifies the streamer that an overrun has occured
+ void set_stopped_due_to_overrun()
+ {
+ _zero_copy_streamer.set_stopped_due_to_overrun();
+ }
+
+ //! Provides a callback to handle overruns
+ void set_overrun_handler(
+ typename rx_streamer_zero_copy<transport_t>::overrun_handler_t handler)
+ {
+ _zero_copy_streamer.set_overrun_handler(handler);
+ }
+
private:
//! Converter and associated item sizes
struct convert_info
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
index 36f568f2d..1f7320330 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -14,6 +14,7 @@
#include <uhdlib/transport/get_aligned_buffs.hpp>
#include <boost/format.hpp>
#include <vector>
+#include <atomic>
namespace uhd { namespace transport {
@@ -26,6 +27,8 @@ template <typename transport_t>
class rx_streamer_zero_copy
{
public:
+ using overrun_handler_t = std::function<void()>;
+
//! Constructor
rx_streamer_zero_copy(const size_t num_ports)
: _xports(num_ports)
@@ -84,6 +87,18 @@ public:
_bytes_per_item = bpi;
}
+ //! Notifies the streamer that an overrun has occured
+ void set_stopped_due_to_overrun()
+ {
+ _stopped_due_to_overrun = true;
+ }
+
+ //! Provides a callback to handle overruns
+ void set_overrun_handler(overrun_handler_t handler)
+ {
+ _overrun_handler = handler;
+ }
+
/*!
* Gets a set of time-aligned buffers, one per channel.
*
@@ -96,35 +111,62 @@ public:
rx_metadata_t& metadata,
const int32_t timeout_ms)
{
- metadata.reset();
+ // Function to set metadata based on alignment error
+ auto set_metadata_for_error =
+ [this](typename get_aligned_buffs_t::alignment_result_t error,
+ rx_metadata_t& metadata) {
+ switch (error) {
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ break;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ break;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ break;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ std::tie(metadata.has_time_spec, metadata.time_spec) =
+ _last_read_time_info.get_next_packet_time(_samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ break;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ };
- switch (_get_aligned_buffs(timeout_ms)) {
- case get_aligned_buffs_t::SUCCESS:
- break;
-
- case get_aligned_buffs_t::BAD_PACKET:
- metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
- return 0;
-
- case get_aligned_buffs_t::TIMEOUT:
- metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
- return 0;
-
- case get_aligned_buffs_t::ALIGNMENT_FAILURE:
- metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
- return 0;
+ metadata.reset();
- case get_aligned_buffs_t::SEQUENCE_ERROR:
- metadata.has_time_spec = _last_read_time_info.has_time_spec;
- metadata.time_spec =
- _last_read_time_info.time_spec
- + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
- metadata.out_of_sequence = true;
- metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ // Try to get buffs with a 0 timeout first. This avoids needing to check
+ // if radios are stopped due to overrun when packets are available.
+ auto result = _get_aligned_buffs(0);
+
+ if (result == get_aligned_buffs_t::TIMEOUT) {
+ if (_stopped_due_to_overrun) {
+ // An overrun occurred and the user has read all the packets
+ // that were buffered prior to the overrun. Call the overrun
+ // handler and return overrun error.
+ _handle_overrun();
+ std::tie(metadata.has_time_spec, metadata.time_spec) =
+ _last_read_time_info.get_next_packet_time(_samp_rate);
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ _stopped_due_to_overrun = false;
return 0;
+ } else {
+ // Packets were not available with zero timeout, wait for them
+ // to arrive using the specified timeout.
+ result = _get_aligned_buffs(timeout_ms);
+ }
+ }
- default:
- UHD_THROW_INVALID_CODE_PATH();
+ if (result != get_aligned_buffs_t::SUCCESS) {
+ set_metadata_for_error(result, metadata);
+ return 0;
}
// Get payload pointers for each buffer and aggregate eob. We set eob to
@@ -167,6 +209,34 @@ public:
private:
using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+ void _handle_overrun()
+ {
+ // Flush any remaining packets. This method is called after any channel
+ // times out, so here we ensure all channels are flushed prior to
+ // calling the overrun handler to potentially restart the radios.
+ for (size_t chan = 0; chan < _xports.size(); chan++) {
+ if (_frame_buffs[chan]) {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+
+ frame_buff::uptr buff;
+ while (true) {
+ std::tie(buff, std::ignore, std::ignore) =
+ _xports[chan]->get_recv_buff(0);
+ if (!buff) {
+ break;
+ }
+ _xports[chan]->release_recv_buff(std::move(buff));
+ }
+ }
+
+ // Now call the overrun handler
+ if (_overrun_handler) {
+ _overrun_handler();
+ }
+ }
+
// Information recorded by streamer about the last data packet processed,
// used to create the metadata when there is a sequence error.
struct last_read_time_info_t
@@ -174,6 +244,16 @@ private:
size_t num_samps = 0;
bool has_time_spec = false;
time_spec_t time_spec;
+
+ std::tuple<bool, time_spec_t> get_next_packet_time(double samp_rate)
+ {
+ if (has_time_spec) {
+ return std::make_tuple(
+ true, time_spec + time_spec_t::from_ticks(num_samps, samp_rate));
+ } else {
+ return std::make_tuple(false, time_spec_t());
+ }
+ }
};
// Transports for each channel
@@ -200,6 +280,13 @@ private:
// Information about the last data packet processed
last_read_time_info_t _last_read_time_info;
+
+ // Flag that indicates an overrun occurred. The streamer will return an
+ // overrun error when no more packets are available.
+ std::atomic<bool> _stopped_due_to_overrun{false};
+
+ // Callback for overrun
+ overrun_handler_t _overrun_handler;
};
}} // namespace uhd::transport
diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp
index 9d7257108..e400033b3 100644
--- a/host/lib/rfnoc/radio_control_impl.cpp
+++ b/host/lib/rfnoc/radio_control_impl.cpp
@@ -6,6 +6,7 @@
#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/rfnoc/mb_controller.hpp>
#include <uhdlib/rfnoc/radio_control_impl.hpp>
#include <uhdlib/utils/compat_check.hpp>
#include <map>
@@ -63,6 +64,8 @@ const uint32_t radio_control_impl::regmap::RX_CMD_TIMED_POS;
const uhd::fs_path radio_control_impl::DB_PATH("dboard");
const uhd::fs_path radio_control_impl::FE_PATH("frontends");
+static constexpr double OVERRUN_RESTART_DELAY = 0.05;
+
/****************************************************************************
* Structors
***************************************************************************/
@@ -74,7 +77,6 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
, _spc(_radio_width & 0xFFFF)
, _last_stream_cmd(
get_num_output_ports(), uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS)
- , _restart_cont(get_num_output_ports(), false)
{
uhd::assert_fpga_compat(MAJOR_COMPAT,
MINOR_COMPAT,
@@ -110,16 +112,6 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
return;
}
issue_stream_cmd(stream_cmd_action->stream_cmd, port);
- if (stream_cmd_action->stream_cmd.stream_mode
- == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
- && _restart_cont.at(port)) {
- RFNOC_LOG_TRACE("Received stop command after reporting overrun, will now "
- "request restart.");
- _restart_cont[port] = false;
- auto restart_request_action =
- action_info::make(ACTION_KEY_RX_RESTART_REQ);
- post_action({res_source_info::OUTPUT_EDGE, port}, restart_request_action);
- }
});
register_action_handler(ACTION_KEY_RX_RESTART_REQ,
[this](const res_source_info& src, action_info::sptr /*action*/) {
@@ -131,9 +123,10 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
}
auto stream_cmd_action = stream_cmd_action_info::make(
uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
- // FIXME
- // stream_cmd_action->stream_cmd.stream_now = false;
- // stream_cmd_action->stream_cmd.time_spec = get_time_now() + DELTA;
+ stream_cmd_action->stream_cmd.stream_now = false;
+ stream_cmd_action->stream_cmd.time_spec =
+ get_mb_controller()->get_timekeeper(0)->get_time_now() +
+ uhd::time_spec_t(OVERRUN_RESTART_DELAY);
const size_t port = src.instance;
if (port > get_num_output_ports()) {
RFNOC_LOG_WARNING("Received stream command to invalid output port!");
@@ -962,7 +955,6 @@ void radio_control_impl::async_message_handler(
rx_event_action->error_code = uhd::rx_metadata_t::ERROR_CODE_OVERFLOW;
const bool cont_mode = _last_stream_cmd.at(chan).stream_mode
== stream_cmd_t::STREAM_MODE_START_CONTINUOUS;
- _restart_cont[chan] = cont_mode;
rx_event_action->args["cont_mode"] = std::to_string(cont_mode);
RFNOC_LOG_TRACE("Posting overrun event action message.");
post_action(res_source_info{res_source_info::OUTPUT_EDGE, chan},
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
index cfa88b292..b50e2fe15 100644
--- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
@@ -18,16 +18,18 @@ using namespace uhd::rfnoc;
const std::string STREAMER_ID = "RxStreamer";
static std::atomic<uint64_t> streamer_inst_ctr;
-rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
- const uhd::stream_args_t stream_args)
+rfnoc_rx_streamer::rfnoc_rx_streamer(
+ const size_t num_chans, const uhd::stream_args_t stream_args)
: rx_streamer_impl<chdr_rx_data_xport>(num_chans, stream_args)
, _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
, _stream_args(stream_args)
{
+ set_overrun_handler([this]() { this->_handle_overrun(); });
+
// No block to which to forward properties or actions
set_prop_forwarding_policy(forwarding_policy_t::DROP);
set_action_forwarding_policy(forwarding_policy_t::DROP);
- //
+
register_action_handler(ACTION_KEY_RX_EVENT,
[this](const res_source_info& src, action_info::sptr action) {
rx_event_action_info::sptr rx_event_action =
@@ -38,10 +40,6 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
}
_handle_rx_event_action(src, rx_event_action);
});
- register_action_handler(ACTION_KEY_RX_RESTART_REQ,
- [this](const res_source_info& src, action_info::sptr action) {
- _handle_restart_request(src, action);
- });
register_action_handler(ACTION_KEY_STREAM_CMD,
[this](const res_source_info& src, action_info::sptr action) {
stream_cmd_action_info::sptr stream_cmd_action =
@@ -117,6 +115,15 @@ bool rfnoc_rx_streamer::check_topology(
return node_t::check_topology(connected_inputs, connected_outputs);
}
+void rfnoc_rx_streamer::_handle_overrun()
+{
+ if (_overrun_handling_mode) {
+ RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node...");
+ post_action({res_source_info::INPUT_EDGE, _overrun_channel},
+ action_info::make(ACTION_KEY_RX_RESTART_REQ));
+ }
+}
+
void rfnoc_rx_streamer::_register_props(const size_t chan,
const std::string& otw_format)
{
@@ -178,6 +185,7 @@ void rfnoc_rx_streamer::_handle_rx_event_action(
RFNOC_LOG_TRACE("Ignoring duplicate overrun message.");
return;
}
+ _overrun_channel = src.instance;
RFNOC_LOG_TRACE(
"Switching to overrun-handling mode: Stopping all upstream producers...");
auto stop_action =
@@ -188,32 +196,17 @@ void rfnoc_rx_streamer::_handle_rx_event_action(
post_action({res_source_info::INPUT_EDGE, i}, stop_action);
}
if (!rx_event_action->args.cast<bool>("cont_mode", false)) {
- // FIXME wait until it's safe to restart radios
- // If we don't need to restart, that's all we need to do
+ // If we don't need to restart, that's all we need to do. Clear this
+ // flag before setting the stopped due to overrun status below to
+ // avoid a potential race condition with the overrun handler.
_overrun_handling_mode = false;
}
+ // Tell the streamer to flag an overrun to the user after the data that
+ // was buffered prior to the overrun is read.
+ set_stopped_due_to_overrun();
}
}
-void rfnoc_rx_streamer::_handle_restart_request(
- const res_source_info& src, action_info::sptr)
-{
- // FIXME: Now we need to wait until it's safe to restart the radios.
- // A flush would achieve this, albeit at the cost of possibly losing
- // samples.
- // The earliest we can restart is when the FIFOs in upstream producers
- // are empty.
- RFNOC_LOG_TRACE("Waiting for FIFOs to clear");
-
- std::this_thread::sleep_for(100ms);
-
- // Once it's safe to restart the radios, we ask a radio to send us a
- // stream command with its current time.
- RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node...");
- post_action({res_source_info::INPUT_EDGE, src.instance},
- action_info::make(ACTION_KEY_RX_RESTART_REQ));
-}
-
void rfnoc_rx_streamer::_handle_stream_cmd_action(
const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action)
{