aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include
diff options
context:
space:
mode:
authorAaron Rossetto <aaron.rossetto@ni.com>2019-09-27 14:56:25 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 12:21:32 -0800
commit2f97f8bd0167d4179427efa8a955046fbf417e91 (patch)
tree255c660b8bdff86047937a75a0f3b3798b1da73b /host/lib/include
parent41f142050fb39ad533f82256b574b5c08c160bc1 (diff)
downloaduhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.gz
uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.bz2
uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.zip
transport: Implement eov indications for Rx and Tx streams
Diffstat (limited to 'host/lib/include')
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp2
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp2
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp18
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp79
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp215
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp3
6 files changed, 274 insertions, 45 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
index 9c5d88066..3333f4f9d 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
@@ -108,6 +108,7 @@ public:
struct packet_info_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
@@ -347,6 +348,7 @@ private:
packet_info_t info;
info.eob = header.get_eob();
+ info.eov = header.get_eov();
info.has_tsf = optional_time.is_initialized();
info.tsf = optional_time ? *optional_time : 0;
info.payload_bytes = _recv_packet->get_payload_size();
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
index 1e54a2f7a..8658767b6 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -112,6 +112,7 @@ public:
struct packet_info_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
@@ -226,6 +227,7 @@ public:
}
_send_header.set_eob(info.eob);
+ _send_header.set_eov(info.eov);
_send_header.set_seq_num(_data_seq_num++);
_send_packet->refresh(buff->data(), _send_header, tsf);
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
index 0691138e6..04cd12c59 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -139,10 +139,13 @@ public:
const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+ detail::eov_data_wrapper eov_positions(metadata);
+
size_t total_samps_recv =
- _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms);
+ _recv_one_packet(buffs, nsamps_per_buff, metadata, eov_positions, timeout_ms);
- if (one_packet or metadata.end_of_burst) {
+ if (one_packet or metadata.end_of_burst or
+ (eov_positions.data() and eov_positions.remaining() == 0)) {
return total_samps_recv;
}
@@ -153,13 +156,14 @@ public:
// Loop until buffer is filled or error code. This method returns the
// metadata from the first packet received, with the exception of
- // end-of-burst.
+ // end-of-burst and end-of-vector indications (if requested).
uhd::rx_metadata_t loop_metadata;
while (total_samps_recv < nsamps_per_buff) {
size_t num_samps = _recv_one_packet(buffs,
nsamps_per_buff - total_samps_recv,
loop_metadata,
+ eov_positions,
timeout_ms,
total_samps_recv * _convert_info.bytes_per_cpu_item);
@@ -176,6 +180,10 @@ public:
metadata.end_of_burst = true;
break;
}
+ // Return if the end-of-vector position array has been exhausted
+ if (eov_positions.data() and eov_positions.remaining() == 0) {
+ break;
+ }
}
return total_samps_recv;
@@ -246,13 +254,15 @@ private:
UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,
const size_t nsamps_per_buff,
uhd::rx_metadata_t& metadata,
+ detail::eov_data_wrapper& eov_positions,
const int32_t timeout_ms,
const size_t buffer_offset_bytes = 0)
{
if (_buff_samps_remaining == 0) {
// Current set of buffers has expired, get the next one
_buff_samps_remaining =
- _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms);
+ _zero_copy_streamer.get_recv_buffs(
+ _in_buffs, metadata, eov_positions, timeout_ms);
_fragment_offset_in_samps = 0;
} else {
// There are samples still left in the current set of buffers
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
index 4da925062..98588c6f5 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -18,6 +18,66 @@
namespace uhd { namespace transport {
+namespace detail {
+
+class eov_data_wrapper
+{
+public:
+ eov_data_wrapper(uhd::rx_metadata_t& metadata)
+ : _metadata(metadata)
+ , _data(metadata.eov_positions)
+ , _size(metadata.eov_positions_size)
+ , _remaining(metadata.eov_positions_size)
+ , _write_pos(0)
+ , _running_sample_count(0)
+ {
+ }
+
+ ~eov_data_wrapper()
+ {
+ _metadata.eov_positions = _data;
+ _metadata.eov_positions_size = _size;
+ _metadata.eov_positions_count = _write_pos;
+ }
+
+ UHD_FORCE_INLINE size_t* data() const
+ {
+ return _data;
+ }
+
+ UHD_FORCE_INLINE size_t remaining() const
+ {
+ return _remaining;
+ }
+
+ UHD_FORCE_INLINE void push_back(size_t value)
+ {
+ assert(_data && _remaining > 0);
+ _data[_write_pos++] = value;
+ _remaining--;
+ }
+
+ UHD_FORCE_INLINE void update_running_sample_count(size_t num_samples)
+ {
+ _running_sample_count += num_samples;
+ }
+
+ UHD_FORCE_INLINE size_t get_running_sample_count() const
+ {
+ return _running_sample_count;
+ }
+
+private:
+ uhd::rx_metadata_t& _metadata;
+ size_t* _data;
+ size_t _size;
+ size_t _remaining;
+ size_t _write_pos;
+ size_t _running_sample_count;
+};
+
+} // namespace uhd::transport::detail
+
/*!
* Implementation of rx streamer manipulation of frame buffers and packet info.
* This class is part of rx_streamer_impl, split into a separate unit as it is
@@ -115,6 +175,7 @@ public:
*/
size_t get_recv_buffs(std::vector<const void*>& buffs,
rx_metadata_t& metadata,
+ detail::eov_data_wrapper& eov_positions,
const int32_t timeout_ms)
{
// Function to set metadata based on alignment error
@@ -184,10 +245,14 @@ public:
// Get payload pointers for each buffer and aggregate eob. We set eob to
// true if any channel has it set, since no more data will be received for
// that channel. In most cases, all channels should have the same value.
+ // We do the same for eov here, as it is expected that eov will be the
+ // same for all channels.
bool eob = false;
+ bool eov = false;
for (size_t i = 0; i < buffs.size(); i++) {
buffs[i] = _infos[i].payload;
eob |= _infos[i].eob;
+ eov |= _infos[i].eov;
}
// Set the metadata from the buffer information at index zero
@@ -199,10 +264,21 @@ public:
metadata.end_of_burst = eob;
metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+ // If the caller wants eov indications via metadata, then check
+ // eov and set the metadata values appropriately. Note that only
+ // channel 0 is checked for eov--in most cases, it should be the
+ // same for all channels.
+ if (eov_positions.data() && eov) {
+ eov_positions.push_back(
+ eov_positions.get_running_sample_count() +
+ info_0.payload_bytes / _bytes_per_item);
+ }
+
// Done with these packets, save timestamp info for next call
_last_read_time_info.has_time_spec = metadata.has_time_spec;
_last_read_time_info.time_spec = metadata.time_spec;
_last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item;
+ eov_positions.update_running_sample_count(_last_read_time_info.num_samps);
return _last_read_time_info.num_samps;
}
@@ -293,6 +369,9 @@ private:
// Information about the last data packet processed
last_read_time_info_t _last_read_time_info;
+ // Total number of samples read, used in determining EOV positions
+ size_t _total_num_samps = 0;
+
// Flag that indicates an overrun occurred. The streamer will return an
// overrun error when no more packets are available.
std::atomic<bool> _stopped_due_to_overrun{false};
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index fa84026fe..c594dd530 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -60,6 +60,41 @@ private:
uhd::tx_metadata_t _metadata_cache;
};
+class tx_eov_data_wrapper
+{
+public:
+ tx_eov_data_wrapper(const uhd::tx_metadata_t& metadata)
+ : _eov_positions(metadata.eov_positions)
+ , _eov_positions_size(metadata.eov_positions_size)
+ , _remaining(metadata.eov_positions_size)
+ , _read_pos(0)
+ {
+ }
+
+ UHD_FORCE_INLINE size_t* data() const
+ {
+ return _eov_positions;
+ }
+
+ UHD_FORCE_INLINE size_t remaining() const
+ {
+ return _remaining;
+ }
+
+ UHD_FORCE_INLINE size_t pop_front()
+ {
+ assert(_eov_positions && _remaining > 0);
+ _remaining--;
+ return _eov_positions[_read_pos++];
+ }
+
+private:
+ size_t* _eov_positions;
+ size_t _eov_positions_size;
+ size_t _remaining;
+ size_t _read_pos;
+};
+
} // namespace detail
/*!
@@ -125,60 +160,157 @@ public:
_metadata_cache.check(metadata);
- const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+ const bool eob_on_last_packet = metadata.end_of_burst;
- if (nsamps_per_buff == 0) {
- // Send requests with no samples are handled here, such as end of
- // burst. Send packets need to have at least one sample based on the
- // chdr specification, so we use _zero_buffs here.
- _send_one_packet(_zero_buffs.data(),
- 0, // buffer offset
- 1, // num samples
- metadata,
- timeout_ms);
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
- return 0;
- } else if (nsamps_per_buff <= _spp) {
- return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms);
+ detail::tx_eov_data_wrapper eov_positions(metadata);
+
+ // If there are EOVs specified in the metadata, it will be necessary
+ // to break up the packet sends based on where the EOVs should be
+ // generated in the sequence of packets.
+ //
+ // `nsamps_to_send_remaining` represents the total number of
+ // samples remaining to send to fulfill the caller's request.
+ size_t nsamps_to_send_remaining = nsamps_per_buff;
+
+ // `nsamps_to_send` represents a subset of the total number of
+ // samples to send based on whether or not the caller's metadata
+ // specifies EOV positions.
+ // * If there are no EOVs, it represents the entire send request
+ // made by the caller. It may be broken up into chunks no larger
+ // than _spp later on in the function, but it will not be broken up
+ // due to EOV. There will only be one iteration through the do/
+ // while loop.
+ // * If there are EOVs, `nsamps_to_send` represents the number of
+ // samples to send to get to the next EOV position. Again, it may
+ // be broken up into chunks no larger than _spp, but note that the
+ // final chunk will have EOV signalled in its header. There may be
+ // multiple iterations through the do/while loop to fulfill the
+ // caller's entire send request.
+ size_t nsamps_to_send;
+
+ // `num_samps_sent` is the return value from each individual call
+ // to `_send_one_packet()`.
+ size_t num_samps_sent = 0;
+
+ // `total_nsamps_sent` accumulates the total number of samples sent
+ // in each chunk, and is used to determine the offset within `buffs`
+ // to pass to `_send_one_packet()`.
+ size_t total_nsamps_sent = 0;
+
+ size_t last_eov_position = 0;
+ bool eov;
+
+ do {
+ if (eov_positions.data() and eov_positions.remaining() > 0) {
+ size_t next_eov_position = eov_positions.pop_front();
+ // Check basic requirements: EOV positions must be monotonically
+ // increasing
+ if (next_eov_position <= last_eov_position) {
+ throw uhd::value_error("Invalid EOV position specified "
+ "(violates eov_pos[n] > eov_pos[n-1])");
+ }
+ // EOV position must be within the range of the samples written
+ if (next_eov_position > nsamps_per_buff) {
+ throw uhd::value_error("Invalid EOV position specified "
+ "(violates eov_pos[n] <= nsamps_per_buff)");
+ }
+ nsamps_to_send = next_eov_position - last_eov_position;
+ eov = true;
+ } else {
+ // No EOVs, or the EOV position list has been exhausted:
+ // simply send the remaining samples
+ nsamps_to_send = nsamps_to_send_remaining;
+ eov = false;
+ }
- } else {
- size_t total_num_samps_sent = 0;
- const bool eob = metadata.end_of_burst;
- metadata.end_of_burst = false;
+ if (nsamps_to_send == 0) {
+ // Send requests with no samples are handled here, such as end of
+ // burst. Send packets need to have at least one sample based on the
+ // chdr specification, so we use _zero_buffs here.
+ _send_one_packet(_zero_buffs.data(),
+ 0, // buffer offset
+ 1, // num samples
+ metadata,
+ false,
+ timeout_ms);
- const size_t num_fragments = (nsamps_per_buff - 1) / _spp;
- const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1;
+ return 0;
- for (size_t i = 0; i < num_fragments; i++) {
- const size_t num_samps_sent = _send_one_packet(
- buffs, total_num_samps_sent, _spp, metadata, timeout_ms);
+ } else if (nsamps_to_send <= _spp) {
+ // If last packet, apply saved EOB state to metadata
+ metadata.end_of_burst =
+ (eob_on_last_packet and nsamps_to_send == nsamps_to_send_remaining);
- total_num_samps_sent += num_samps_sent;
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, nsamps_to_send, metadata, eov, timeout_ms);
- if (num_samps_sent == 0) {
- return total_num_samps_sent;
+ metadata.start_of_burst = false;
+ } else {
+ // Note: since `nsamps_to_send` is guaranteed to be > _spp
+ // if the code reaches this else clause, `num_fragments` will
+ // always be at least 1.
+ const size_t num_fragments = (nsamps_to_send - 1) / _spp;
+ const size_t final_length = ((nsamps_to_send - 1) % _spp) + 1;
+
+ metadata.end_of_burst = false;
+
+ for (size_t i = 0; i < num_fragments; i++) {
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, _spp, metadata, false, timeout_ms);
+
+ // Advance sample accumulator and decrement remaining
+ // samples for this segment
+ total_nsamps_sent += num_samps_sent;
+ nsamps_to_send_remaining -= num_samps_sent;
+
+ if (num_samps_sent == 0) {
+ return total_nsamps_sent;
+ }
+
+ // Setup timespec for the next fragment
+ if (metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ metadata.start_of_burst = false;
}
- // Setup timespec for the next fragment
- if (metadata.has_time_spec) {
- metadata.time_spec =
- metadata.time_spec
- + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
- }
+ // Send the final fragment
+ metadata.end_of_burst =
+ (eob_on_last_packet and final_length == nsamps_to_send_remaining);
- metadata.start_of_burst = false;
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, final_length, metadata, eov, timeout);
}
- // Send the final fragment
- metadata.end_of_burst = eob;
+ // Advance sample accumulator and decrement remaining samples
+ total_nsamps_sent += num_samps_sent;
+ nsamps_to_send_remaining -= num_samps_sent;
- size_t nsamps_sent =
- total_num_samps_sent
- + _send_one_packet(
- buffs, total_num_samps_sent, final_length, metadata, timeout);
+ // Loop exit condition: return from `_send_one_packet()` indicates
+ // an error
+ if (num_samps_sent == 0) {
+ break;
+ }
- return nsamps_sent;
- }
+ // If there are more samples to be sent, thus requiring another
+ // trip around the do/while loop, update the timespec in the
+ // metadata for the next fragment (if desired)
+ if (nsamps_to_send_remaining > 0 and metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ last_eov_position = total_nsamps_sent;
+
+ } while (nsamps_to_send_remaining > 0);
+
+ return total_nsamps_sent;
}
protected:
@@ -233,10 +365,11 @@ private:
const size_t buffer_offset_in_samps,
const size_t num_samples,
const tx_metadata_t& metadata,
+ const bool eov,
const int32_t timeout_ms)
{
if (!_zero_copy_streamer.get_send_buffs(
- _out_buffs, num_samples, metadata, timeout_ms)) {
+ _out_buffs, num_samples, metadata, eov, timeout_ms)) {
return 0;
}
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
index 5ac7a1e8c..aa36c9d3b 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -75,12 +75,14 @@ public:
* \param buffs returns a pointer to the buffer data
* \param nsamps_per_buff the number of samples that will be written to each buffer
* \param metadata the metadata to write to the packet header
+ * \param eov EOV flag to write to the packet header
* \param timeout_ms timeout in milliseconds
* \return true if the operation was sucessful, false if timeout occurs
*/
UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs,
const size_t nsamps_per_buff,
const tx_metadata_t& metadata,
+ const bool eov,
const int32_t timeout_ms)
{
// Try to get a buffer per channel
@@ -106,6 +108,7 @@ public:
info.payload_bytes = nsamps_per_buff * _bytes_per_item;
info.eob = metadata.end_of_burst;
+ info.eov = eov;
// Write packet header
for (size_t i = 0; i < buffs.size(); i++) {