aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/include/uhdlib/transport')
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp13
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp137
2 files changed, 125 insertions, 25 deletions
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
index d3fe97c7f..cc989e8f2 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -196,6 +196,19 @@ protected:
_zero_copy_streamer.set_tick_rate(rate);
}
+ //! Notifies the streamer that an overrun has occured
+ void set_stopped_due_to_overrun()
+ {
+ _zero_copy_streamer.set_stopped_due_to_overrun();
+ }
+
+ //! Provides a callback to handle overruns
+ void set_overrun_handler(
+ typename rx_streamer_zero_copy<transport_t>::overrun_handler_t handler)
+ {
+ _zero_copy_streamer.set_overrun_handler(handler);
+ }
+
private:
//! Converter and associated item sizes
struct convert_info
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
index 36f568f2d..1f7320330 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -14,6 +14,7 @@
#include <uhdlib/transport/get_aligned_buffs.hpp>
#include <boost/format.hpp>
#include <vector>
+#include <atomic>
namespace uhd { namespace transport {
@@ -26,6 +27,8 @@ template <typename transport_t>
class rx_streamer_zero_copy
{
public:
+ using overrun_handler_t = std::function<void()>;
+
//! Constructor
rx_streamer_zero_copy(const size_t num_ports)
: _xports(num_ports)
@@ -84,6 +87,18 @@ public:
_bytes_per_item = bpi;
}
+ //! Notifies the streamer that an overrun has occured
+ void set_stopped_due_to_overrun()
+ {
+ _stopped_due_to_overrun = true;
+ }
+
+ //! Provides a callback to handle overruns
+ void set_overrun_handler(overrun_handler_t handler)
+ {
+ _overrun_handler = handler;
+ }
+
/*!
* Gets a set of time-aligned buffers, one per channel.
*
@@ -96,35 +111,62 @@ public:
rx_metadata_t& metadata,
const int32_t timeout_ms)
{
- metadata.reset();
+ // Function to set metadata based on alignment error
+ auto set_metadata_for_error =
+ [this](typename get_aligned_buffs_t::alignment_result_t error,
+ rx_metadata_t& metadata) {
+ switch (error) {
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ break;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ break;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ break;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ std::tie(metadata.has_time_spec, metadata.time_spec) =
+ _last_read_time_info.get_next_packet_time(_samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ break;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ };
- switch (_get_aligned_buffs(timeout_ms)) {
- case get_aligned_buffs_t::SUCCESS:
- break;
-
- case get_aligned_buffs_t::BAD_PACKET:
- metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
- return 0;
-
- case get_aligned_buffs_t::TIMEOUT:
- metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
- return 0;
-
- case get_aligned_buffs_t::ALIGNMENT_FAILURE:
- metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
- return 0;
+ metadata.reset();
- case get_aligned_buffs_t::SEQUENCE_ERROR:
- metadata.has_time_spec = _last_read_time_info.has_time_spec;
- metadata.time_spec =
- _last_read_time_info.time_spec
- + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
- metadata.out_of_sequence = true;
- metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ // Try to get buffs with a 0 timeout first. This avoids needing to check
+ // if radios are stopped due to overrun when packets are available.
+ auto result = _get_aligned_buffs(0);
+
+ if (result == get_aligned_buffs_t::TIMEOUT) {
+ if (_stopped_due_to_overrun) {
+ // An overrun occurred and the user has read all the packets
+ // that were buffered prior to the overrun. Call the overrun
+ // handler and return overrun error.
+ _handle_overrun();
+ std::tie(metadata.has_time_spec, metadata.time_spec) =
+ _last_read_time_info.get_next_packet_time(_samp_rate);
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ _stopped_due_to_overrun = false;
return 0;
+ } else {
+ // Packets were not available with zero timeout, wait for them
+ // to arrive using the specified timeout.
+ result = _get_aligned_buffs(timeout_ms);
+ }
+ }
- default:
- UHD_THROW_INVALID_CODE_PATH();
+ if (result != get_aligned_buffs_t::SUCCESS) {
+ set_metadata_for_error(result, metadata);
+ return 0;
}
// Get payload pointers for each buffer and aggregate eob. We set eob to
@@ -167,6 +209,34 @@ public:
private:
using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+ void _handle_overrun()
+ {
+ // Flush any remaining packets. This method is called after any channel
+ // times out, so here we ensure all channels are flushed prior to
+ // calling the overrun handler to potentially restart the radios.
+ for (size_t chan = 0; chan < _xports.size(); chan++) {
+ if (_frame_buffs[chan]) {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+
+ frame_buff::uptr buff;
+ while (true) {
+ std::tie(buff, std::ignore, std::ignore) =
+ _xports[chan]->get_recv_buff(0);
+ if (!buff) {
+ break;
+ }
+ _xports[chan]->release_recv_buff(std::move(buff));
+ }
+ }
+
+ // Now call the overrun handler
+ if (_overrun_handler) {
+ _overrun_handler();
+ }
+ }
+
// Information recorded by streamer about the last data packet processed,
// used to create the metadata when there is a sequence error.
struct last_read_time_info_t
@@ -174,6 +244,16 @@ private:
size_t num_samps = 0;
bool has_time_spec = false;
time_spec_t time_spec;
+
+ std::tuple<bool, time_spec_t> get_next_packet_time(double samp_rate)
+ {
+ if (has_time_spec) {
+ return std::make_tuple(
+ true, time_spec + time_spec_t::from_ticks(num_samps, samp_rate));
+ } else {
+ return std::make_tuple(false, time_spec_t());
+ }
+ }
};
// Transports for each channel
@@ -200,6 +280,13 @@ private:
// Information about the last data packet processed
last_read_time_info_t _last_read_time_info;
+
+ // Flag that indicates an overrun occurred. The streamer will return an
+ // overrun error when no more packets are available.
+ std::atomic<bool> _stopped_due_to_overrun{false};
+
+ // Callback for overrun
+ overrun_handler_t _overrun_handler;
};
}} // namespace uhd::transport