aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp1
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp8
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp13
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp137
-rw-r--r--host/lib/rfnoc/radio_control_impl.cpp22
-rw-r--r--host/lib/rfnoc/rfnoc_rx_streamer.cpp49
6 files changed, 158 insertions, 72 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
index 5440c1e37..5327105c8 100644
--- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
+++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
@@ -324,7 +324,6 @@ private:
std::unordered_map<size_t, double> _rx_bandwidth;
std::vector<uhd::stream_cmd_t> _last_stream_cmd;
- std::vector<bool> _restart_cont;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
index 1d1d0805d..d39d88f43 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -10,8 +10,8 @@
#include <uhd/rfnoc/node.hpp>
#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <uhdlib/transport/rx_streamer_impl.hpp>
-#include <string>
#include <atomic>
+#include <string>
namespace uhd { namespace rfnoc {
@@ -75,16 +75,17 @@ public:
*/
bool check_topology(const std::vector<size_t>& connected_inputs,
const std::vector<size_t>& connected_outputs);
+
private:
void _register_props(const size_t chan, const std::string& otw_format);
void _handle_rx_event_action(
const res_source_info& src, rx_event_action_info::sptr rx_event_action);
- void _handle_restart_request(
- const res_source_info& src, action_info::sptr rx_event_action);
void _handle_stream_cmd_action(
const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action);
+ void _handle_overrun();
+
// Properties
std::vector<property_t<double>> _scaling_in;
std::vector<property_t<double>> _samp_rate_in;
@@ -98,6 +99,7 @@ private:
const uhd::stream_args_t _stream_args;
std::atomic<bool> _overrun_handling_mode{false};
+ size_t _overrun_channel = 0;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
index d3fe97c7f..cc989e8f2 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -196,6 +196,19 @@ protected:
_zero_copy_streamer.set_tick_rate(rate);
}
+ //! Notifies the streamer that an overrun has occured
+ void set_stopped_due_to_overrun()
+ {
+ _zero_copy_streamer.set_stopped_due_to_overrun();
+ }
+
+ //! Provides a callback to handle overruns
+ void set_overrun_handler(
+ typename rx_streamer_zero_copy<transport_t>::overrun_handler_t handler)
+ {
+ _zero_copy_streamer.set_overrun_handler(handler);
+ }
+
private:
//! Converter and associated item sizes
struct convert_info
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
index 36f568f2d..1f7320330 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -14,6 +14,7 @@
#include <uhdlib/transport/get_aligned_buffs.hpp>
#include <boost/format.hpp>
#include <vector>
+#include <atomic>
namespace uhd { namespace transport {
@@ -26,6 +27,8 @@ template <typename transport_t>
class rx_streamer_zero_copy
{
public:
+ using overrun_handler_t = std::function<void()>;
+
//! Constructor
rx_streamer_zero_copy(const size_t num_ports)
: _xports(num_ports)
@@ -84,6 +87,18 @@ public:
_bytes_per_item = bpi;
}
+ //! Notifies the streamer that an overrun has occured
+ void set_stopped_due_to_overrun()
+ {
+ _stopped_due_to_overrun = true;
+ }
+
+ //! Provides a callback to handle overruns
+ void set_overrun_handler(overrun_handler_t handler)
+ {
+ _overrun_handler = handler;
+ }
+
/*!
* Gets a set of time-aligned buffers, one per channel.
*
@@ -96,35 +111,62 @@ public:
rx_metadata_t& metadata,
const int32_t timeout_ms)
{
- metadata.reset();
+ // Function to set metadata based on alignment error
+ auto set_metadata_for_error =
+ [this](typename get_aligned_buffs_t::alignment_result_t error,
+ rx_metadata_t& metadata) {
+ switch (error) {
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ break;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ break;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ break;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ std::tie(metadata.has_time_spec, metadata.time_spec) =
+ _last_read_time_info.get_next_packet_time(_samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ break;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ };
- switch (_get_aligned_buffs(timeout_ms)) {
- case get_aligned_buffs_t::SUCCESS:
- break;
-
- case get_aligned_buffs_t::BAD_PACKET:
- metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
- return 0;
-
- case get_aligned_buffs_t::TIMEOUT:
- metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
- return 0;
-
- case get_aligned_buffs_t::ALIGNMENT_FAILURE:
- metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
- return 0;
+ metadata.reset();
- case get_aligned_buffs_t::SEQUENCE_ERROR:
- metadata.has_time_spec = _last_read_time_info.has_time_spec;
- metadata.time_spec =
- _last_read_time_info.time_spec
- + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
- metadata.out_of_sequence = true;
- metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ // Try to get buffs with a 0 timeout first. This avoids needing to check
+ // if radios are stopped due to overrun when packets are available.
+ auto result = _get_aligned_buffs(0);
+
+ if (result == get_aligned_buffs_t::TIMEOUT) {
+ if (_stopped_due_to_overrun) {
+ // An overrun occurred and the user has read all the packets
+ // that were buffered prior to the overrun. Call the overrun
+ // handler and return overrun error.
+ _handle_overrun();
+ std::tie(metadata.has_time_spec, metadata.time_spec) =
+ _last_read_time_info.get_next_packet_time(_samp_rate);
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ _stopped_due_to_overrun = false;
return 0;
+ } else {
+ // Packets were not available with zero timeout, wait for them
+ // to arrive using the specified timeout.
+ result = _get_aligned_buffs(timeout_ms);
+ }
+ }
- default:
- UHD_THROW_INVALID_CODE_PATH();
+ if (result != get_aligned_buffs_t::SUCCESS) {
+ set_metadata_for_error(result, metadata);
+ return 0;
}
// Get payload pointers for each buffer and aggregate eob. We set eob to
@@ -167,6 +209,34 @@ public:
private:
using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+ void _handle_overrun()
+ {
+ // Flush any remaining packets. This method is called after any channel
+ // times out, so here we ensure all channels are flushed prior to
+ // calling the overrun handler to potentially restart the radios.
+ for (size_t chan = 0; chan < _xports.size(); chan++) {
+ if (_frame_buffs[chan]) {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+
+ frame_buff::uptr buff;
+ while (true) {
+ std::tie(buff, std::ignore, std::ignore) =
+ _xports[chan]->get_recv_buff(0);
+ if (!buff) {
+ break;
+ }
+ _xports[chan]->release_recv_buff(std::move(buff));
+ }
+ }
+
+ // Now call the overrun handler
+ if (_overrun_handler) {
+ _overrun_handler();
+ }
+ }
+
// Information recorded by streamer about the last data packet processed,
// used to create the metadata when there is a sequence error.
struct last_read_time_info_t
@@ -174,6 +244,16 @@ private:
size_t num_samps = 0;
bool has_time_spec = false;
time_spec_t time_spec;
+
+ std::tuple<bool, time_spec_t> get_next_packet_time(double samp_rate)
+ {
+ if (has_time_spec) {
+ return std::make_tuple(
+ true, time_spec + time_spec_t::from_ticks(num_samps, samp_rate));
+ } else {
+ return std::make_tuple(false, time_spec_t());
+ }
+ }
};
// Transports for each channel
@@ -200,6 +280,13 @@ private:
// Information about the last data packet processed
last_read_time_info_t _last_read_time_info;
+
+ // Flag that indicates an overrun occurred. The streamer will return an
+ // overrun error when no more packets are available.
+ std::atomic<bool> _stopped_due_to_overrun{false};
+
+ // Callback for overrun
+ overrun_handler_t _overrun_handler;
};
}} // namespace uhd::transport
diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp
index 9d7257108..e400033b3 100644
--- a/host/lib/rfnoc/radio_control_impl.cpp
+++ b/host/lib/rfnoc/radio_control_impl.cpp
@@ -6,6 +6,7 @@
#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/rfnoc/mb_controller.hpp>
#include <uhdlib/rfnoc/radio_control_impl.hpp>
#include <uhdlib/utils/compat_check.hpp>
#include <map>
@@ -63,6 +64,8 @@ const uint32_t radio_control_impl::regmap::RX_CMD_TIMED_POS;
const uhd::fs_path radio_control_impl::DB_PATH("dboard");
const uhd::fs_path radio_control_impl::FE_PATH("frontends");
+static constexpr double OVERRUN_RESTART_DELAY = 0.05;
+
/****************************************************************************
* Structors
***************************************************************************/
@@ -74,7 +77,6 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
, _spc(_radio_width & 0xFFFF)
, _last_stream_cmd(
get_num_output_ports(), uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS)
- , _restart_cont(get_num_output_ports(), false)
{
uhd::assert_fpga_compat(MAJOR_COMPAT,
MINOR_COMPAT,
@@ -110,16 +112,6 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
return;
}
issue_stream_cmd(stream_cmd_action->stream_cmd, port);
- if (stream_cmd_action->stream_cmd.stream_mode
- == uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
- && _restart_cont.at(port)) {
- RFNOC_LOG_TRACE("Received stop command after reporting overrun, will now "
- "request restart.");
- _restart_cont[port] = false;
- auto restart_request_action =
- action_info::make(ACTION_KEY_RX_RESTART_REQ);
- post_action({res_source_info::OUTPUT_EDGE, port}, restart_request_action);
- }
});
register_action_handler(ACTION_KEY_RX_RESTART_REQ,
[this](const res_source_info& src, action_info::sptr /*action*/) {
@@ -131,9 +123,10 @@ radio_control_impl::radio_control_impl(make_args_ptr make_args)
}
auto stream_cmd_action = stream_cmd_action_info::make(
uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
- // FIXME
- // stream_cmd_action->stream_cmd.stream_now = false;
- // stream_cmd_action->stream_cmd.time_spec = get_time_now() + DELTA;
+ stream_cmd_action->stream_cmd.stream_now = false;
+ stream_cmd_action->stream_cmd.time_spec =
+ get_mb_controller()->get_timekeeper(0)->get_time_now() +
+ uhd::time_spec_t(OVERRUN_RESTART_DELAY);
const size_t port = src.instance;
if (port > get_num_output_ports()) {
RFNOC_LOG_WARNING("Received stream command to invalid output port!");
@@ -962,7 +955,6 @@ void radio_control_impl::async_message_handler(
rx_event_action->error_code = uhd::rx_metadata_t::ERROR_CODE_OVERFLOW;
const bool cont_mode = _last_stream_cmd.at(chan).stream_mode
== stream_cmd_t::STREAM_MODE_START_CONTINUOUS;
- _restart_cont[chan] = cont_mode;
rx_event_action->args["cont_mode"] = std::to_string(cont_mode);
RFNOC_LOG_TRACE("Posting overrun event action message.");
post_action(res_source_info{res_source_info::OUTPUT_EDGE, chan},
diff --git a/host/lib/rfnoc/rfnoc_rx_streamer.cpp b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
index cfa88b292..b50e2fe15 100644
--- a/host/lib/rfnoc/rfnoc_rx_streamer.cpp
+++ b/host/lib/rfnoc/rfnoc_rx_streamer.cpp
@@ -18,16 +18,18 @@ using namespace uhd::rfnoc;
const std::string STREAMER_ID = "RxStreamer";
static std::atomic<uint64_t> streamer_inst_ctr;
-rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
- const uhd::stream_args_t stream_args)
+rfnoc_rx_streamer::rfnoc_rx_streamer(
+ const size_t num_chans, const uhd::stream_args_t stream_args)
: rx_streamer_impl<chdr_rx_data_xport>(num_chans, stream_args)
, _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++))
, _stream_args(stream_args)
{
+ set_overrun_handler([this]() { this->_handle_overrun(); });
+
// No block to which to forward properties or actions
set_prop_forwarding_policy(forwarding_policy_t::DROP);
set_action_forwarding_policy(forwarding_policy_t::DROP);
- //
+
register_action_handler(ACTION_KEY_RX_EVENT,
[this](const res_source_info& src, action_info::sptr action) {
rx_event_action_info::sptr rx_event_action =
@@ -38,10 +40,6 @@ rfnoc_rx_streamer::rfnoc_rx_streamer(const size_t num_chans,
}
_handle_rx_event_action(src, rx_event_action);
});
- register_action_handler(ACTION_KEY_RX_RESTART_REQ,
- [this](const res_source_info& src, action_info::sptr action) {
- _handle_restart_request(src, action);
- });
register_action_handler(ACTION_KEY_STREAM_CMD,
[this](const res_source_info& src, action_info::sptr action) {
stream_cmd_action_info::sptr stream_cmd_action =
@@ -117,6 +115,15 @@ bool rfnoc_rx_streamer::check_topology(
return node_t::check_topology(connected_inputs, connected_outputs);
}
+void rfnoc_rx_streamer::_handle_overrun()
+{
+ if (_overrun_handling_mode) {
+ RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node...");
+ post_action({res_source_info::INPUT_EDGE, _overrun_channel},
+ action_info::make(ACTION_KEY_RX_RESTART_REQ));
+ }
+}
+
void rfnoc_rx_streamer::_register_props(const size_t chan,
const std::string& otw_format)
{
@@ -178,6 +185,7 @@ void rfnoc_rx_streamer::_handle_rx_event_action(
RFNOC_LOG_TRACE("Ignoring duplicate overrun message.");
return;
}
+ _overrun_channel = src.instance;
RFNOC_LOG_TRACE(
"Switching to overrun-handling mode: Stopping all upstream producers...");
auto stop_action =
@@ -188,32 +196,17 @@ void rfnoc_rx_streamer::_handle_rx_event_action(
post_action({res_source_info::INPUT_EDGE, i}, stop_action);
}
if (!rx_event_action->args.cast<bool>("cont_mode", false)) {
- // FIXME wait until it's safe to restart radios
- // If we don't need to restart, that's all we need to do
+ // If we don't need to restart, that's all we need to do. Clear this
+ // flag before setting the stopped due to overrun status below to
+ // avoid a potential race condition with the overrun handler.
_overrun_handling_mode = false;
}
+ // Tell the streamer to flag an overrun to the user after the data that
+ // was buffered prior to the overrun is read.
+ set_stopped_due_to_overrun();
}
}
-void rfnoc_rx_streamer::_handle_restart_request(
- const res_source_info& src, action_info::sptr)
-{
- // FIXME: Now we need to wait until it's safe to restart the radios.
- // A flush would achieve this, albeit at the cost of possibly losing
- // samples.
- // The earliest we can restart is when the FIFOs in upstream producers
- // are empty.
- RFNOC_LOG_TRACE("Waiting for FIFOs to clear");
-
- std::this_thread::sleep_for(100ms);
-
- // Once it's safe to restart the radios, we ask a radio to send us a
- // stream command with its current time.
- RFNOC_LOG_TRACE("Requesting restart from overrun-reporting node...");
- post_action({res_source_info::INPUT_EDGE, src.instance},
- action_info::make(ACTION_KEY_RX_RESTART_REQ));
-}
-
void rfnoc_rx_streamer::_handle_stream_cmd_action(
const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action)
{