aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAaron Rossetto <aaron.rossetto@ni.com>2019-09-27 14:56:25 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 12:21:32 -0800
commit2f97f8bd0167d4179427efa8a955046fbf417e91 (patch)
tree255c660b8bdff86047937a75a0f3b3798b1da73b
parent41f142050fb39ad533f82256b574b5c08c160bc1 (diff)
downloaduhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.gz
uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.bz2
uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.zip
transport: Implement eov indications for Rx and Tx streams
-rw-r--r--host/include/uhd/types/metadata.hpp56
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp2
-rw-r--r--host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp2
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp18
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp79
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp215
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp3
-rw-r--r--host/lib/types/types.cpp10
-rw-r--r--host/tests/rx_streamer_test.cpp109
-rw-r--r--host/tests/streamer_benchmark.cpp2
-rw-r--r--host/tests/tx_streamer_test.cpp273
11 files changed, 711 insertions, 58 deletions
diff --git a/host/include/uhd/types/metadata.hpp b/host/include/uhd/types/metadata.hpp
index a29cae93c..8ec321e5f 100644
--- a/host/include/uhd/types/metadata.hpp
+++ b/host/include/uhd/types/metadata.hpp
@@ -31,14 +31,17 @@ struct UHD_API rx_metadata_t
//! Reset values.
void reset()
{
- has_time_spec = false;
- time_spec = time_spec_t(0.0);
- more_fragments = false;
- fragment_offset = 0;
- start_of_burst = false;
- end_of_burst = false;
- error_code = ERROR_CODE_NONE;
- out_of_sequence = false;
+ has_time_spec = false;
+ time_spec = time_spec_t(0.0);
+ more_fragments = false;
+ fragment_offset = 0;
+ start_of_burst = false;
+ end_of_burst = false;
+ eov_positions = nullptr;
+ eov_positions_size = 0;
+ eov_positions_count = 0;
+ error_code = ERROR_CODE_NONE;
+ out_of_sequence = false;
}
//! Has time specification?
@@ -70,6 +73,31 @@ struct UHD_API rx_metadata_t
bool end_of_burst;
/*!
+ * If this pointer is not null, it specifies the address of an array of
+ * `size_t`s into which the sample offset relative to the beginning of
+ * a call to `recv()` of each vector (as denoted by packets with the `eov`
+ * header byte set) will be written.
+ *
+ * The caller is responsible for allocating and deallocating the storage
+ * for the array and for indicating the maximum number of elements in
+ * the array via the `eov_positions_size` value below.
+ *
+ * Upon return from `recv()`, `eov_positions_count` will be updated to
+ * indicate the number of valid entries written to the
+ * `end_of_vector_positions` array. It shall never exceed the value of
+ * `eov_positions_size`. However, if in the process of `recv()`, storage
+ * for new positions is exhausted, then `recv()` shall return.
+ */
+ size_t* eov_positions;
+ size_t eov_positions_size;
+
+ /*!
+ * Upon return from `recv()`, holds the number of end-of-vector indications
+ * in the `eov_positions` array.
+ */
+ size_t eov_positions_count;
+
+ /*!
* The error condition on a receive call.
*
* Note: When an overrun occurs in continuous streaming mode,
@@ -151,6 +179,18 @@ struct UHD_API tx_metadata_t
bool end_of_burst;
/*!
+ * If this pointer is not null, it specifies the address of an array of
+ * `size_t`s specifying the sample offset relative to the beginning of
+ * the call to `send()` where an EOV should be signalled.
+ *
+ * The caller is responsible for allocating and deallocating the storage
+ * for the array and for indicating the maximum number of elements in
+ * the array via the `eov_positions_size` value below.
+ */
+ size_t* eov_positions = nullptr;
+ size_t eov_positions_size = 0;
+
+ /*!
* The default constructor:
* Sets the fields to default values (flags set to false).
*/
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
index 9c5d88066..3333f4f9d 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp
@@ -108,6 +108,7 @@ public:
struct packet_info_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
@@ -347,6 +348,7 @@ private:
packet_info_t info;
info.eob = header.get_eob();
+ info.eov = header.get_eov();
info.has_tsf = optional_time.is_initialized();
info.tsf = optional_time ? *optional_time : 0;
info.payload_bytes = _recv_packet->get_payload_size();
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
index 1e54a2f7a..8658767b6 100644
--- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
+++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp
@@ -112,6 +112,7 @@ public:
struct packet_info_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
@@ -226,6 +227,7 @@ public:
}
_send_header.set_eob(info.eob);
+ _send_header.set_eov(info.eov);
_send_header.set_seq_num(_data_seq_num++);
_send_packet->refresh(buff->data(), _send_header, tsf);
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
index 0691138e6..04cd12c59 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -139,10 +139,13 @@ public:
const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+ detail::eov_data_wrapper eov_positions(metadata);
+
size_t total_samps_recv =
- _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms);
+ _recv_one_packet(buffs, nsamps_per_buff, metadata, eov_positions, timeout_ms);
- if (one_packet or metadata.end_of_burst) {
+ if (one_packet or metadata.end_of_burst or
+ (eov_positions.data() and eov_positions.remaining() == 0)) {
return total_samps_recv;
}
@@ -153,13 +156,14 @@ public:
// Loop until buffer is filled or error code. This method returns the
// metadata from the first packet received, with the exception of
- // end-of-burst.
+ // end-of-burst and end-of-vector indications (if requested).
uhd::rx_metadata_t loop_metadata;
while (total_samps_recv < nsamps_per_buff) {
size_t num_samps = _recv_one_packet(buffs,
nsamps_per_buff - total_samps_recv,
loop_metadata,
+ eov_positions,
timeout_ms,
total_samps_recv * _convert_info.bytes_per_cpu_item);
@@ -176,6 +180,10 @@ public:
metadata.end_of_burst = true;
break;
}
+ // Return if the end-of-vector position array has been exhausted
+ if (eov_positions.data() and eov_positions.remaining() == 0) {
+ break;
+ }
}
return total_samps_recv;
@@ -246,13 +254,15 @@ private:
UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,
const size_t nsamps_per_buff,
uhd::rx_metadata_t& metadata,
+ detail::eov_data_wrapper& eov_positions,
const int32_t timeout_ms,
const size_t buffer_offset_bytes = 0)
{
if (_buff_samps_remaining == 0) {
// Current set of buffers has expired, get the next one
_buff_samps_remaining =
- _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms);
+ _zero_copy_streamer.get_recv_buffs(
+ _in_buffs, metadata, eov_positions, timeout_ms);
_fragment_offset_in_samps = 0;
} else {
// There are samples still left in the current set of buffers
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
index 4da925062..98588c6f5 100644
--- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -18,6 +18,66 @@
namespace uhd { namespace transport {
+namespace detail {
+
+class eov_data_wrapper
+{
+public:
+ eov_data_wrapper(uhd::rx_metadata_t& metadata)
+ : _metadata(metadata)
+ , _data(metadata.eov_positions)
+ , _size(metadata.eov_positions_size)
+ , _remaining(metadata.eov_positions_size)
+ , _write_pos(0)
+ , _running_sample_count(0)
+ {
+ }
+
+ ~eov_data_wrapper()
+ {
+ _metadata.eov_positions = _data;
+ _metadata.eov_positions_size = _size;
+ _metadata.eov_positions_count = _write_pos;
+ }
+
+ UHD_FORCE_INLINE size_t* data() const
+ {
+ return _data;
+ }
+
+ UHD_FORCE_INLINE size_t remaining() const
+ {
+ return _remaining;
+ }
+
+ UHD_FORCE_INLINE void push_back(size_t value)
+ {
+ assert(_data && _remaining > 0);
+ _data[_write_pos++] = value;
+ _remaining--;
+ }
+
+ UHD_FORCE_INLINE void update_running_sample_count(size_t num_samples)
+ {
+ _running_sample_count += num_samples;
+ }
+
+ UHD_FORCE_INLINE size_t get_running_sample_count() const
+ {
+ return _running_sample_count;
+ }
+
+private:
+ uhd::rx_metadata_t& _metadata;
+ size_t* _data;
+ size_t _size;
+ size_t _remaining;
+ size_t _write_pos;
+ size_t _running_sample_count;
+};
+
+} // namespace uhd::transport::detail
+
/*!
* Implementation of rx streamer manipulation of frame buffers and packet info.
* This class is part of rx_streamer_impl, split into a separate unit as it is
@@ -115,6 +175,7 @@ public:
*/
size_t get_recv_buffs(std::vector<const void*>& buffs,
rx_metadata_t& metadata,
+ detail::eov_data_wrapper& eov_positions,
const int32_t timeout_ms)
{
// Function to set metadata based on alignment error
@@ -184,10 +245,14 @@ public:
// Get payload pointers for each buffer and aggregate eob. We set eob to
// true if any channel has it set, since no more data will be received for
// that channel. In most cases, all channels should have the same value.
+ // We do the same for eov here, as it is expected that eov will be the
+ // same for all channels.
bool eob = false;
+ bool eov = false;
for (size_t i = 0; i < buffs.size(); i++) {
buffs[i] = _infos[i].payload;
eob |= _infos[i].eob;
+ eov |= _infos[i].eov;
}
// Set the metadata from the buffer information at index zero
@@ -199,10 +264,21 @@ public:
metadata.end_of_burst = eob;
metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+ // If the caller wants eov indications via metadata, then check
+ // eov and set the metadata values appropriately. Note that only
+ // channel 0 is checked for eov--in most cases, it should be the
+ // same for all channels.
+ if (eov_positions.data() && eov) {
+ eov_positions.push_back(
+ eov_positions.get_running_sample_count() +
+ info_0.payload_bytes / _bytes_per_item);
+ }
+
// Done with these packets, save timestamp info for next call
_last_read_time_info.has_time_spec = metadata.has_time_spec;
_last_read_time_info.time_spec = metadata.time_spec;
_last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item;
+ eov_positions.update_running_sample_count(_last_read_time_info.num_samps);
return _last_read_time_info.num_samps;
}
@@ -293,6 +369,9 @@ private:
// Information about the last data packet processed
last_read_time_info_t _last_read_time_info;
+ // Total number of samples read, used in determining EOV positions
+ size_t _total_num_samps = 0;
+
// Flag that indicates an overrun occurred. The streamer will return an
// overrun error when no more packets are available.
std::atomic<bool> _stopped_due_to_overrun{false};
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
index fa84026fe..c594dd530 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -60,6 +60,41 @@ private:
uhd::tx_metadata_t _metadata_cache;
};
+class tx_eov_data_wrapper
+{
+public:
+ tx_eov_data_wrapper(const uhd::tx_metadata_t& metadata)
+ : _eov_positions(metadata.eov_positions)
+ , _eov_positions_size(metadata.eov_positions_size)
+ , _remaining(metadata.eov_positions_size)
+ , _read_pos(0)
+ {
+ }
+
+ UHD_FORCE_INLINE size_t* data() const
+ {
+ return _eov_positions;
+ }
+
+ UHD_FORCE_INLINE size_t remaining() const
+ {
+ return _remaining;
+ }
+
+ UHD_FORCE_INLINE size_t pop_front()
+ {
+ assert(_eov_positions && _remaining > 0);
+ _remaining--;
+ return _eov_positions[_read_pos++];
+ }
+
+private:
+ size_t* _eov_positions;
+ size_t _eov_positions_size;
+ size_t _remaining;
+ size_t _read_pos;
+};
+
} // namespace detail
/*!
@@ -125,60 +160,157 @@ public:
_metadata_cache.check(metadata);
- const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+ const bool eob_on_last_packet = metadata.end_of_burst;
- if (nsamps_per_buff == 0) {
- // Send requests with no samples are handled here, such as end of
- // burst. Send packets need to have at least one sample based on the
- // chdr specification, so we use _zero_buffs here.
- _send_one_packet(_zero_buffs.data(),
- 0, // buffer offset
- 1, // num samples
- metadata,
- timeout_ms);
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
- return 0;
- } else if (nsamps_per_buff <= _spp) {
- return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms);
+ detail::tx_eov_data_wrapper eov_positions(metadata);
+
+ // If there are EOVs specified in the metadata, it will be necessary
+ // to break up the packet sends based on where the EOVs should be
+ // generated in the sequence of packets.
+ //
+ // `nsamps_to_send_remaining` represents the total number of
+ // samples remaining to send to fulfill the caller's request.
+ size_t nsamps_to_send_remaining = nsamps_per_buff;
+
+ // `nsamps_to_send` represents a subset of the total number of
+ // samples to send based on whether or not the caller's metadata
+ // specifies EOV positions.
+ // * If there are no EOVs, it represents the entire send request
+ // made by the caller. It may be broken up into chunks no larger
+ // than _spp later on in the function, but it will not be broken up
+ // due to EOV. There will only be one iteration through the do/
+ // while loop.
+ // * If there are EOVs, `nsamps_to_send` represents the number of
+ // samples to send to get to the next EOV position. Again, it may
+ // be broken up into chunks no larger than _spp, but note that the
+ // final chunk will have EOV signalled in its header. There may be
+ // multiple iterations through the do/while loop to fulfill the
+ // caller's entire send request.
+ size_t nsamps_to_send;
+
+ // `num_samps_sent` is the return value from each individual call
+ // to `_send_one_packet()`.
+ size_t num_samps_sent = 0;
+
+ // `total_nsamps_sent` accumulates the total number of samples sent
+ // in each chunk, and is used to determine the offset within `buffs`
+ // to pass to `_send_one_packet()`.
+ size_t total_nsamps_sent = 0;
+
+ size_t last_eov_position = 0;
+ bool eov;
+
+ do {
+ if (eov_positions.data() and eov_positions.remaining() > 0) {
+ size_t next_eov_position = eov_positions.pop_front();
+ // Check basic requirements: EOV positions must be monotonically
+ // increasing
+ if (next_eov_position <= last_eov_position) {
+ throw uhd::value_error("Invalid EOV position specified "
+ "(violates eov_pos[n] > eov_pos[n-1])");
+ }
+ // EOV position must be within the range of the samples written
+ if (next_eov_position > nsamps_per_buff) {
+ throw uhd::value_error("Invalid EOV position specified "
+ "(violates eov_pos[n] <= nsamps_per_buff)");
+ }
+ nsamps_to_send = next_eov_position - last_eov_position;
+ eov = true;
+ } else {
+ // No EOVs, or the EOV position list has been exhausted:
+ // simply send the remaining samples
+ nsamps_to_send = nsamps_to_send_remaining;
+ eov = false;
+ }
- } else {
- size_t total_num_samps_sent = 0;
- const bool eob = metadata.end_of_burst;
- metadata.end_of_burst = false;
+ if (nsamps_to_send == 0) {
+ // Send requests with no samples are handled here, such as end of
+ // burst. Send packets need to have at least one sample based on the
+ // chdr specification, so we use _zero_buffs here.
+ _send_one_packet(_zero_buffs.data(),
+ 0, // buffer offset
+ 1, // num samples
+ metadata,
+ false,
+ timeout_ms);
- const size_t num_fragments = (nsamps_per_buff - 1) / _spp;
- const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1;
+ return 0;
- for (size_t i = 0; i < num_fragments; i++) {
- const size_t num_samps_sent = _send_one_packet(
- buffs, total_num_samps_sent, _spp, metadata, timeout_ms);
+ } else if (nsamps_to_send <= _spp) {
+ // If last packet, apply saved EOB state to metadata
+ metadata.end_of_burst =
+ (eob_on_last_packet and nsamps_to_send == nsamps_to_send_remaining);
- total_num_samps_sent += num_samps_sent;
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, nsamps_to_send, metadata, eov, timeout_ms);
- if (num_samps_sent == 0) {
- return total_num_samps_sent;
+ metadata.start_of_burst = false;
+ } else {
+ // Note: since `nsamps_to_send` is guaranteed to be > _spp
+ // if the code reaches this else clause, `num_fragments` will
+ // always be at least 1.
+ const size_t num_fragments = (nsamps_to_send - 1) / _spp;
+ const size_t final_length = ((nsamps_to_send - 1) % _spp) + 1;
+
+ metadata.end_of_burst = false;
+
+ for (size_t i = 0; i < num_fragments; i++) {
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, _spp, metadata, false, timeout_ms);
+
+ // Advance sample accumulator and decrement remaining
+ // samples for this segment
+ total_nsamps_sent += num_samps_sent;
+ nsamps_to_send_remaining -= num_samps_sent;
+
+ if (num_samps_sent == 0) {
+ return total_nsamps_sent;
+ }
+
+ // Setup timespec for the next fragment
+ if (metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ metadata.start_of_burst = false;
}
- // Setup timespec for the next fragment
- if (metadata.has_time_spec) {
- metadata.time_spec =
- metadata.time_spec
- + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
- }
+ // Send the final fragment
+ metadata.end_of_burst =
+ (eob_on_last_packet and final_length == nsamps_to_send_remaining);
- metadata.start_of_burst = false;
+ num_samps_sent = _send_one_packet(
+ buffs, total_nsamps_sent, final_length, metadata, eov, timeout);
}
- // Send the final fragment
- metadata.end_of_burst = eob;
+ // Advance sample accumulator and decrement remaining samples
+ total_nsamps_sent += num_samps_sent;
+ nsamps_to_send_remaining -= num_samps_sent;
- size_t nsamps_sent =
- total_num_samps_sent
- + _send_one_packet(
- buffs, total_num_samps_sent, final_length, metadata, timeout);
+ // Loop exit condition: return from `_send_one_packet()` indicates
+ // an error
+ if (num_samps_sent == 0) {
+ break;
+ }
- return nsamps_sent;
- }
+ // If there are more samples to be sent, thus requiring another
+ // trip around the do/while loop, update the timespec in the
+ // metadata for the next fragment (if desired)
+ if (nsamps_to_send_remaining > 0 and metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ last_eov_position = total_nsamps_sent;
+
+ } while (nsamps_to_send_remaining > 0);
+
+ return total_nsamps_sent;
}
protected:
@@ -233,10 +365,11 @@ private:
const size_t buffer_offset_in_samps,
const size_t num_samples,
const tx_metadata_t& metadata,
+ const bool eov,
const int32_t timeout_ms)
{
if (!_zero_copy_streamer.get_send_buffs(
- _out_buffs, num_samples, metadata, timeout_ms)) {
+ _out_buffs, num_samples, metadata, eov, timeout_ms)) {
return 0;
}
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
index 5ac7a1e8c..aa36c9d3b 100644
--- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -75,12 +75,14 @@ public:
* \param buffs returns a pointer to the buffer data
* \param nsamps_per_buff the number of samples that will be written to each buffer
* \param metadata the metadata to write to the packet header
+ * \param eov EOV flag to write to the packet header
* \param timeout_ms timeout in milliseconds
* \return true if the operation was sucessful, false if timeout occurs
*/
UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs,
const size_t nsamps_per_buff,
const tx_metadata_t& metadata,
+ const bool eov,
const int32_t timeout_ms)
{
// Try to get a buffer per channel
@@ -106,6 +108,7 @@ public:
info.payload_bytes = nsamps_per_buff * _bytes_per_item;
info.eob = metadata.end_of_burst;
+ info.eov = eov;
// Write packet header
for (size_t i = 0; i < buffs.size(); i++) {
diff --git a/host/lib/types/types.cpp b/host/lib/types/types.cpp
index aa87f30a5..7e947a4f8 100644
--- a/host/lib/types/types.cpp
+++ b/host/lib/types/types.cpp
@@ -24,11 +24,11 @@ stream_cmd_t::stream_cmd_t(const stream_mode_t &stream_mode):
/***********************************************************************
* metadata
**********************************************************************/
-tx_metadata_t::tx_metadata_t(void):
- has_time_spec(false),
- time_spec(time_spec_t()),
- start_of_burst(false),
- end_of_burst(false)
+tx_metadata_t::tx_metadata_t(void)
+ : has_time_spec(false)
+ , time_spec(time_spec_t())
+ , start_of_burst(false)
+ , end_of_burst(false)
{
/* NOP */
}
diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp
index 1b1311908..953b82028 100644
--- a/host/tests/rx_streamer_test.cpp
+++ b/host/tests/rx_streamer_test.cpp
@@ -18,6 +18,7 @@ namespace uhd { namespace transport {
struct mock_header_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
@@ -39,6 +40,7 @@ public:
struct packet_info_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
@@ -55,6 +57,7 @@ public:
packet_info_t info;
info.eob = header.eob;
+ info.eov = header.eov;
info.has_tsf = header.has_tsf;
info.tsf = header.tsf;
info.payload_bytes = header.payload_bytes;
@@ -742,3 +745,109 @@ BOOST_AUTO_TEST_CASE(test_recv_alignment_error)
BOOST_CHECK_EQUAL(num_samps_ret, 0);
BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT);
}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_eov)
+{
+ const size_t NUM_PACKETS = 5;
+ const std::string format("fc64");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * NUM_PACKETS;
+ std::vector<std::complex<double>> buff(num_samps);
+
+ for (size_t i = 0; i < NUM_PACKETS; i++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.tsf = i;
+
+ for (size_t j = 0; j < NUM_PACKETS; j++) {
+ header.eov = (i == j);
+ push_back_recv_packet(recv_links[0], header, spp);
+ }
+
+ uhd::rx_metadata_t metadata;
+ // Create a vector with storage for two EOVs even though we expect
+ // only one, since filling the EOV vector results in an early
+ // termination of `recv()` (which we don't want here).
+ std::vector<size_t> eov_positions(2);
+ metadata.eov_positions = eov_positions.data();
+ metadata.eov_positions_size = eov_positions.size();
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.eov_positions, eov_positions.data());
+ BOOST_CHECK_EQUAL(metadata.eov_positions_size, eov_positions.size());
+ BOOST_CHECK_EQUAL(metadata.eov_positions_count, 1);
+ BOOST_CHECK_EQUAL(eov_positions[0], (i + 1) * spp);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_two_channel_aggregate_eov)
+{
+ const size_t NUM_PACKETS = 20;
+ const std::string format("fc64");
+
+ // This vector defines which packets in each channel's mock link will
+ // signal EOV in their packet headers.
+ //
+ // For example, for a vector with 3 values, [3, 5, 8]:
+ // Link 0 packets with EOV: 3rd, 6th, 9th, 12th, 15th, ...
+ // Link 1 packets with EOV: 5th, 10th, 15th, 20th, ...
+ // Link 2 packets with EOV: 8th, 16th, 24th, 32nd, ...
+ const std::vector<size_t> eov_every_nth_packet{3, 5};
+
+ const size_t num_chans = eov_every_nth_packet.size();
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * NUM_PACKETS;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ mock_header_t header;
+ std::vector<size_t> expected_eov_offsets;
+ for (size_t i = 0; i < NUM_PACKETS; i++) {
+ bool eov = false;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ header.eob = false;
+ header.has_tsf = false;
+ header.eov = ((i + 1) % eov_every_nth_packet[ch]) == 0;
+
+ push_back_recv_packet(recv_links[ch], header, spp);
+
+ eov |= header.eov;
+ }
+ if(eov) {
+ expected_eov_offsets.push_back(spp * (i + 1));
+ }
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ std::vector<size_t> eov_positions(expected_eov_offsets.size() + 1);
+ metadata.eov_positions = eov_positions.data();
+ metadata.eov_positions_size = eov_positions.size();
+
+ size_t num_samps_ret =
+ streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.eov_positions_count, expected_eov_offsets.size());
+ for(size_t i = 0; i < metadata.eov_positions_count; i++) {
+ BOOST_CHECK_EQUAL(expected_eov_offsets[i], metadata.eov_positions[i]);
+ }
+}
diff --git a/host/tests/streamer_benchmark.cpp b/host/tests/streamer_benchmark.cpp
index 02aa102a0..0c11441ed 100644
--- a/host/tests/streamer_benchmark.cpp
+++ b/host/tests/streamer_benchmark.cpp
@@ -47,6 +47,7 @@ public:
struct packet_info_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
@@ -108,6 +109,7 @@ public:
struct packet_info_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
diff --git a/host/tests/tx_streamer_test.cpp b/host/tests/tx_streamer_test.cpp
index 09c8357df..2288bd029 100644
--- a/host/tests/tx_streamer_test.cpp
+++ b/host/tests/tx_streamer_test.cpp
@@ -26,6 +26,7 @@ public:
struct packet_info_t
{
bool eob = false;
+ bool eov = false;
bool has_tsf = false;
uint64_t tsf = 0;
size_t payload_bytes = 0;
@@ -158,6 +159,34 @@ pop_send_packet(mock_send_link::sptr send_link)
return std::make_tuple(info, data, packet_samps, packet.first);
}
+//! Generates a non-biased random number in the range (low, high).
+static size_t generate_rand(size_t low, size_t high)
+{
+ return (std::rand() % (high - low + 1)) + low;
+}
+
+/*!
+ * Generates a vector of legal random EOV positions.
+ * `eovs` is a vector already sized to the desired number of EOV positions.
+ * The range [1, num_samps) will be broken into N=`eov.size()`
+ * non-overlapping adjacent ranges, and a random value within each range will
+ * be generated and stored in the vector.
+ */
+static void generate_random_eov_positions(
+ std::vector<size_t>& eovs, const size_t num_samps)
+{
+ UHD_ASSERT_THROW(!eovs.empty());
+
+ const size_t num_eovs = eovs.size();
+ const size_t range_size = (num_samps - 1) / num_eovs;
+ size_t low = 1;
+ for (size_t i = 0; i < num_eovs; i++) {
+ const size_t high = low + range_size - 1;
+ eovs[i] = generate_rand(low, high);
+ low = high + 1;
+ }
+}
+
/*!
* Tests
*/
@@ -216,6 +245,250 @@ BOOST_AUTO_TEST_CASE(test_send_one_channel_one_packet)
}
}
+BOOST_AUTO_TEST_CASE(test_send_one_channel_eov_lte_spp)
+{
+ const size_t NUM_PKTS_TO_TEST = 30;
+ const std::string format("fc32");
+
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer
+ const size_t num_samps = streamer->get_max_num_samps();
+ std::vector<std::complex<float>> buff(num_samps);
+
+ // Send buffer and check resultant packets
+ size_t num_accum_samps = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "sending packet " << i << std::endl;
+
+ // Vary number of EOVs for each send
+ const size_t num_eovs = (i % 10) + 1;
+ std::vector<size_t> eovs(num_eovs);
+ generate_random_eov_positions(eovs, num_samps);
+
+ metadata.eov_positions = eovs.data();
+ metadata.eov_positions_size = eovs.size();
+
+ const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ // Check number of packets written (should be # of EOVs plus one)
+ size_t num_packets_written = send_links[0]->get_num_packets();
+ BOOST_CHECK_EQUAL(num_packets_written, eovs.size() + 1);
+
+ // Pop each packets and check size of each relative to EOV positions
+ size_t total_samps_popped = 0;
+ size_t eov_index = 0;
+ size_t last_eov_position = 0;
+ while (total_samps_popped < num_samps) {
+ std::tie(info, data, packet_samps, frame_buff) =
+ pop_send_packet(send_links[0]);
+
+ // All but the last packet should have an EOV indication
+ if (eov_index < eovs.size()) {
+ BOOST_CHECK_EQUAL(eovs[eov_index] - last_eov_position, packet_samps);
+ BOOST_CHECK(info.eov);
+ last_eov_position = eovs[eov_index];
+ } else {
+ BOOST_CHECK_EQUAL(num_samps - last_eov_position, packet_samps);
+ BOOST_CHECK(not info.eov);
+ }
+
+ // Verify correctness of TSF data
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE);
+
+ eov_index++;
+ num_accum_samps += packet_samps;
+ total_samps_popped += packet_samps;
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_send_one_channel_eov_gt_spp)
+{
+ const size_t NUM_PKTS_TO_TEST = 30;
+ const std::string format("fc32");
+
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * 50;
+ std::vector<std::complex<float>> buff(num_samps);
+
+ // Send buffer and check resultant packets
+ size_t num_accum_samps = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "sending packet " << i << std::endl;
+
+ // Vary number of EOVs for each send
+ const size_t num_eovs = (i % 5) + 1;
+ std::vector<size_t> eovs(num_eovs);
+ generate_random_eov_positions(eovs, num_samps);
+
+ metadata.eov_positions = eovs.data();
+ metadata.eov_positions_size = eovs.size();
+
+ const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ size_t total_samps_popped = 0;
+ size_t eov_index = 0;
+ size_t last_eov_position = 0;
+ size_t distance_to_next_eov = eovs[eov_index] - last_eov_position;
+ size_t distance_to_end = num_samps;
+ while (total_samps_popped < num_samps) {
+ std::tie(info, data, packet_samps, frame_buff) =
+ pop_send_packet(send_links[0]);
+
+ if (distance_to_next_eov <= spp) {
+ // Next EOV is within a single SPP: ensure packet is EOV
+ BOOST_CHECK_EQUAL(distance_to_next_eov, packet_samps);
+ BOOST_CHECK(info.eov);
+ last_eov_position = eovs[eov_index];
+ eov_index++;
+ if (eov_index < eovs.size()) {
+ // NOTE: Addition of `packet_samps` is to compensate for
+ // its subtraction below
+ distance_to_next_eov =
+ eovs[eov_index] - last_eov_position + packet_samps;
+ } else {
+ // No more EOVs
+ distance_to_next_eov = std::numeric_limits<size_t>::max();
+ }
+ } else if (distance_to_end <= spp) {
+ // End of data within a single SPP
+ BOOST_CHECK_EQUAL(distance_to_end, packet_samps);
+ BOOST_CHECK(not info.eov);
+ } else {
+ BOOST_CHECK(not info.eov);
+ }
+
+ // Verify correctness of TSF data
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE);
+
+ distance_to_end -= packet_samps;
+ distance_to_next_eov -= packet_samps;
+ total_samps_popped += packet_samps;
+ num_accum_samps += packet_samps;
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_send_one_channel_eov_corner_case)
+{
+ const std::string format("fc32");
+
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * 2;
+ std::vector<std::complex<float>> buff(num_samps);
+
+ // Mark every sample as an EOV :D
+ std::vector<size_t> eovs(num_samps);
+ for (size_t i = 0; i < num_samps; i++) {
+ eovs[i] = i + 1;
+ }
+
+ metadata.eov_positions = eovs.data();
+ metadata.eov_positions_size = eovs.size();
+
+ const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ // Check all packets for EOV
+ BOOST_CHECK_EQUAL(send_links[0]->get_num_packets(), num_samps);
+
+ for (size_t i = 0; i < num_samps; i++) {
+ std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]);
+ BOOST_CHECK_EQUAL(packet_samps, 1);
+ BOOST_CHECK(info.eov);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_send_one_channel_eov_error_cases)
+{
+ const std::string format("fc32");
+
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp;
+ std::vector<std::complex<float>> buff(num_samps);
+
+ // Error case: EOV of 0
+ size_t eov = 0;
+ metadata.eov_positions = &eov;
+ metadata.eov_positions_size = 1;
+
+ BOOST_CHECK_THROW(
+ streamer->send(&buff.front(), num_samps, metadata, 1.0), uhd::value_error);
+
+ // Error case: Adjacent EOV values that are the same
+ std::vector<size_t> eovs{100, 100};
+ metadata.eov_positions = eovs.data();
+ metadata.eov_positions_size = eovs.size();
+
+ BOOST_CHECK_THROW(
+ streamer->send(&buff.front(), num_samps, metadata, 1.0), uhd::value_error);
+
+ // Error case: EOV values not monotonically increasing
+ eovs = {50, 25};
+
+ BOOST_CHECK_THROW(
+ streamer->send(&buff.front(), num_samps, metadata, 1.0), uhd::value_error);
+
+ // Error case: EOV values greater than nsamps_per_buff
+ BOOST_CHECK_THROW(
+ streamer->send(&buff.front(), 1, metadata, 1.0), uhd::value_error);
+}
+
BOOST_AUTO_TEST_CASE(test_send_one_channel_multi_packet)
{
const size_t NUM_BUFFS_TO_TEST = 5;