aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/include/uhdlib')
-rw-r--r--host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp1
-rw-r--r--host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp8
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp13
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp137
4 files changed, 130 insertions, 29 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
index 5440c1e37..5327105c8 100644
--- a/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
+++ b/host/lib/include/uhdlib/rfnoc/radio_control_impl.hpp
@@ -324,7 +324,6 @@ private:
std::unordered_map<size_t, double> _rx_bandwidth;
std::vector<uhd::stream_cmd_t> _last_stream_cmd;
- std::vector<bool> _restart_cont;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
index 1d1d0805d..d39d88f43 100644
--- a/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
+++ b/host/lib/include/uhdlib/rfnoc/rfnoc_rx_streamer.hpp
@@ -10,8 +10,8 @@
#include <uhd/rfnoc/node.hpp>
#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
#include <uhdlib/transport/rx_streamer_impl.hpp>
-#include <string>
#include <atomic>
+#include <string>
namespace uhd { namespace rfnoc {
@@ -75,16 +75,17 @@ public:
*/
bool check_topology(const std::vector<size_t>& connected_inputs,
const std::vector<size_t>& connected_outputs);
+
private:
void _register_props(const size_t chan, const std::string& otw_format);
void _handle_rx_event_action(
const res_source_info& src, rx_event_action_info::sptr rx_event_action);
- void _handle_restart_request(
- const res_source_info& src, action_info::sptr rx_event_action);
void _handle_stream_cmd_action(
const res_source_info& src, stream_cmd_action_info::sptr stream_cmd_action);
+ void _handle_overrun();
+
// Properties
std::vector<property_t<double>> _scaling_in;
std::vector<property_t<double>> _samp_rate_in;
@@ -98,6 +99,7 @@ private:
const uhd::stream_args_t _stream_args;
std::atomic<bool> _overrun_handling_mode{false};
+ size_t _overrun_channel = 0;
};
}} // namespace uhd::rfnoc
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
index d3fe97c7f..cc989e8f2 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -196,6 +196,19 @@ protected:
_zero_copy_streamer.set_tick_rate(rate);
}
+ //! Notifies the streamer that an overrun has occured
+ void set_stopped_due_to_overrun()
+ {
+ _zero_copy_streamer.set_stopped_due_to_overrun();
+ }
+
+ //! Provides a callback to handle overruns
+ void set_overrun_handler(
+ typename rx_streamer_zero_copy<transport_t>::overrun_handler_t handler)
+ {
+ _zero_copy_streamer.set_overrun_handler(handler);
+ }
+
private:
//! Converter and associated item sizes
struct convert_info
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
index 36f568f2d..1f7320330 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -14,6 +14,7 @@
#include <uhdlib/transport/get_aligned_buffs.hpp>
#include <boost/format.hpp>
#include <vector>
+#include <atomic>
namespace uhd { namespace transport {
@@ -26,6 +27,8 @@ template <typename transport_t>
class rx_streamer_zero_copy
{
public:
+ using overrun_handler_t = std::function<void()>;
+
//! Constructor
rx_streamer_zero_copy(const size_t num_ports)
: _xports(num_ports)
@@ -84,6 +87,18 @@ public:
_bytes_per_item = bpi;
}
+ //! Notifies the streamer that an overrun has occured
+ void set_stopped_due_to_overrun()
+ {
+ _stopped_due_to_overrun = true;
+ }
+
+ //! Provides a callback to handle overruns
+ void set_overrun_handler(overrun_handler_t handler)
+ {
+ _overrun_handler = handler;
+ }
+
/*!
* Gets a set of time-aligned buffers, one per channel.
*
@@ -96,35 +111,62 @@ public:
rx_metadata_t& metadata,
const int32_t timeout_ms)
{
- metadata.reset();
+ // Function to set metadata based on alignment error
+ auto set_metadata_for_error =
+ [this](typename get_aligned_buffs_t::alignment_result_t error,
+ rx_metadata_t& metadata) {
+ switch (error) {
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ break;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ break;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ break;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ std::tie(metadata.has_time_spec, metadata.time_spec) =
+ _last_read_time_info.get_next_packet_time(_samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ break;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ };
- switch (_get_aligned_buffs(timeout_ms)) {
- case get_aligned_buffs_t::SUCCESS:
- break;
-
- case get_aligned_buffs_t::BAD_PACKET:
- metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
- return 0;
-
- case get_aligned_buffs_t::TIMEOUT:
- metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
- return 0;
-
- case get_aligned_buffs_t::ALIGNMENT_FAILURE:
- metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
- return 0;
+ metadata.reset();
- case get_aligned_buffs_t::SEQUENCE_ERROR:
- metadata.has_time_spec = _last_read_time_info.has_time_spec;
- metadata.time_spec =
- _last_read_time_info.time_spec
- + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
- metadata.out_of_sequence = true;
- metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ // Try to get buffs with a 0 timeout first. This avoids needing to check
+ // if radios are stopped due to overrun when packets are available.
+ auto result = _get_aligned_buffs(0);
+
+ if (result == get_aligned_buffs_t::TIMEOUT) {
+ if (_stopped_due_to_overrun) {
+ // An overrun occurred and the user has read all the packets
+ // that were buffered prior to the overrun. Call the overrun
+ // handler and return overrun error.
+ _handle_overrun();
+ std::tie(metadata.has_time_spec, metadata.time_spec) =
+ _last_read_time_info.get_next_packet_time(_samp_rate);
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ _stopped_due_to_overrun = false;
return 0;
+ } else {
+ // Packets were not available with zero timeout, wait for them
+ // to arrive using the specified timeout.
+ result = _get_aligned_buffs(timeout_ms);
+ }
+ }
- default:
- UHD_THROW_INVALID_CODE_PATH();
+ if (result != get_aligned_buffs_t::SUCCESS) {
+ set_metadata_for_error(result, metadata);
+ return 0;
}
// Get payload pointers for each buffer and aggregate eob. We set eob to
@@ -167,6 +209,34 @@ public:
private:
using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+ void _handle_overrun()
+ {
+ // Flush any remaining packets. This method is called after any channel
+ // times out, so here we ensure all channels are flushed prior to
+ // calling the overrun handler to potentially restart the radios.
+ for (size_t chan = 0; chan < _xports.size(); chan++) {
+ if (_frame_buffs[chan]) {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+
+ frame_buff::uptr buff;
+ while (true) {
+ std::tie(buff, std::ignore, std::ignore) =
+ _xports[chan]->get_recv_buff(0);
+ if (!buff) {
+ break;
+ }
+ _xports[chan]->release_recv_buff(std::move(buff));
+ }
+ }
+
+ // Now call the overrun handler
+ if (_overrun_handler) {
+ _overrun_handler();
+ }
+ }
+
// Information recorded by streamer about the last data packet processed,
// used to create the metadata when there is a sequence error.
struct last_read_time_info_t
@@ -174,6 +244,16 @@ private:
size_t num_samps = 0;
bool has_time_spec = false;
time_spec_t time_spec;
+
+ std::tuple<bool, time_spec_t> get_next_packet_time(double samp_rate)
+ {
+ if (has_time_spec) {
+ return std::make_tuple(
+ true, time_spec + time_spec_t::from_ticks(num_samps, samp_rate));
+ } else {
+ return std::make_tuple(false, time_spec_t());
+ }
+ }
};
// Transports for each channel
@@ -200,6 +280,13 @@ private:
// Information about the last data packet processed
last_read_time_info_t _last_read_time_info;
+
+ // Flag that indicates an overrun occurred. The streamer will return an
+ // overrun error when no more packets are available.
+ std::atomic<bool> _stopped_due_to_overrun{false};
+
+ // Callback for overrun
+ overrun_handler_t _overrun_handler;
};
}} // namespace uhd::transport