diff options
author | Aaron Rossetto <aaron.rossetto@ni.com> | 2019-09-27 14:56:25 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 12:21:32 -0800 |
commit | 2f97f8bd0167d4179427efa8a955046fbf417e91 (patch) | |
tree | 255c660b8bdff86047937a75a0f3b3798b1da73b /host | |
parent | 41f142050fb39ad533f82256b574b5c08c160bc1 (diff) | |
download | uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.gz uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.tar.bz2 uhd-2f97f8bd0167d4179427efa8a955046fbf417e91.zip |
transport: Implement eov indications for Rx and Tx streams
Diffstat (limited to 'host')
-rw-r--r-- | host/include/uhd/types/metadata.hpp | 56 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp | 2 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 2 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/rx_streamer_impl.hpp | 18 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp | 79 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_impl.hpp | 215 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp | 3 | ||||
-rw-r--r-- | host/lib/types/types.cpp | 10 | ||||
-rw-r--r-- | host/tests/rx_streamer_test.cpp | 109 | ||||
-rw-r--r-- | host/tests/streamer_benchmark.cpp | 2 | ||||
-rw-r--r-- | host/tests/tx_streamer_test.cpp | 273 |
11 files changed, 711 insertions, 58 deletions
diff --git a/host/include/uhd/types/metadata.hpp b/host/include/uhd/types/metadata.hpp index a29cae93c..8ec321e5f 100644 --- a/host/include/uhd/types/metadata.hpp +++ b/host/include/uhd/types/metadata.hpp @@ -31,14 +31,17 @@ struct UHD_API rx_metadata_t //! Reset values. void reset() { - has_time_spec = false; - time_spec = time_spec_t(0.0); - more_fragments = false; - fragment_offset = 0; - start_of_burst = false; - end_of_burst = false; - error_code = ERROR_CODE_NONE; - out_of_sequence = false; + has_time_spec = false; + time_spec = time_spec_t(0.0); + more_fragments = false; + fragment_offset = 0; + start_of_burst = false; + end_of_burst = false; + eov_positions = nullptr; + eov_positions_size = 0; + eov_positions_count = 0; + error_code = ERROR_CODE_NONE; + out_of_sequence = false; } //! Has time specification? @@ -70,6 +73,31 @@ struct UHD_API rx_metadata_t bool end_of_burst; /*! + * If this pointer is not null, it specifies the address of an array of + * `size_t`s into which the sample offset relative to the beginning of + * a call to `recv()` of each vector (as denoted by packets with the `eov` + * header byte set) will be written. + * + * The caller is responsible for allocating and deallocating the storage + * for the array and for indicating the maximum number of elements in + * the array via the `eov_positions_size` value below. + * + * Upon return from `recv()`, `eov_positions_count` will be updated to + * indicate the number of valid entries written to the + * `end_of_vector_positions` array. It shall never exceed the value of + * `eov_positions_size`. However, if in the process of `recv()`, storage + * for new positions is exhausted, then `recv()` shall return. + */ + size_t* eov_positions; + size_t eov_positions_size; + + /*! + * Upon return from `recv()`, holds the number of end-of-vector indications + * in the `eov_positions` array. + */ + size_t eov_positions_count; + + /*! * The error condition on a receive call. * * Note: When an overrun occurs in continuous streaming mode, @@ -151,6 +179,18 @@ struct UHD_API tx_metadata_t bool end_of_burst; /*! + * If this pointer is not null, it specifies the address of an array of + * `size_t`s specifying the sample offset relative to the beginning of + * the call to `send()` where an EOV should be signalled. + * + * The caller is responsible for allocating and deallocating the storage + * for the array and for indicating the maximum number of elements in + * the array via the `eov_positions_size` value below. + */ + size_t* eov_positions = nullptr; + size_t eov_positions_size = 0; + + /*! * The default constructor: * Sets the fields to default values (flags set to false). */ diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp index 9c5d88066..3333f4f9d 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -108,6 +108,7 @@ public: struct packet_info_t { bool eob = false; + bool eov = false; bool has_tsf = false; uint64_t tsf = 0; size_t payload_bytes = 0; @@ -347,6 +348,7 @@ private: packet_info_t info; info.eob = header.get_eob(); + info.eov = header.get_eov(); info.has_tsf = optional_time.is_initialized(); info.tsf = optional_time ? *optional_time : 0; info.payload_bytes = _recv_packet->get_payload_size(); diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 1e54a2f7a..8658767b6 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -112,6 +112,7 @@ public: struct packet_info_t { bool eob = false; + bool eov = false; bool has_tsf = false; uint64_t tsf = 0; size_t payload_bytes = 0; @@ -226,6 +227,7 @@ public: } _send_header.set_eob(info.eob); + _send_header.set_eov(info.eov); _send_header.set_seq_num(_data_seq_num++); _send_packet->refresh(buff->data(), _send_header, tsf); diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp index 0691138e6..04cd12c59 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -139,10 +139,13 @@ public: const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + detail::eov_data_wrapper eov_positions(metadata); + size_t total_samps_recv = - _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms); + _recv_one_packet(buffs, nsamps_per_buff, metadata, eov_positions, timeout_ms); - if (one_packet or metadata.end_of_burst) { + if (one_packet or metadata.end_of_burst or + (eov_positions.data() and eov_positions.remaining() == 0)) { return total_samps_recv; } @@ -153,13 +156,14 @@ public: // Loop until buffer is filled or error code. This method returns the // metadata from the first packet received, with the exception of - // end-of-burst. + // end-of-burst and end-of-vector indications (if requested). uhd::rx_metadata_t loop_metadata; while (total_samps_recv < nsamps_per_buff) { size_t num_samps = _recv_one_packet(buffs, nsamps_per_buff - total_samps_recv, loop_metadata, + eov_positions, timeout_ms, total_samps_recv * _convert_info.bytes_per_cpu_item); @@ -176,6 +180,10 @@ public: metadata.end_of_burst = true; break; } + // Return if the end-of-vector position array has been exhausted + if (eov_positions.data() and eov_positions.remaining() == 0) { + break; + } } return total_samps_recv; @@ -246,13 +254,15 @@ private: UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs, const size_t nsamps_per_buff, uhd::rx_metadata_t& metadata, + detail::eov_data_wrapper& eov_positions, const int32_t timeout_ms, const size_t buffer_offset_bytes = 0) { if (_buff_samps_remaining == 0) { // Current set of buffers has expired, get the next one _buff_samps_remaining = - _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms); + _zero_copy_streamer.get_recv_buffs( + _in_buffs, metadata, eov_positions, timeout_ms); _fragment_offset_in_samps = 0; } else { // There are samples still left in the current set of buffers diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp index 4da925062..98588c6f5 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -18,6 +18,66 @@ namespace uhd { namespace transport { +namespace detail { + +class eov_data_wrapper +{ +public: + eov_data_wrapper(uhd::rx_metadata_t& metadata) + : _metadata(metadata) + , _data(metadata.eov_positions) + , _size(metadata.eov_positions_size) + , _remaining(metadata.eov_positions_size) + , _write_pos(0) + , _running_sample_count(0) + { + } + + ~eov_data_wrapper() + { + _metadata.eov_positions = _data; + _metadata.eov_positions_size = _size; + _metadata.eov_positions_count = _write_pos; + } + + UHD_FORCE_INLINE size_t* data() const + { + return _data; + } + + UHD_FORCE_INLINE size_t remaining() const + { + return _remaining; + } + + UHD_FORCE_INLINE void push_back(size_t value) + { + assert(_data && _remaining > 0); + _data[_write_pos++] = value; + _remaining--; + } + + UHD_FORCE_INLINE void update_running_sample_count(size_t num_samples) + { + _running_sample_count += num_samples; + } + + UHD_FORCE_INLINE size_t get_running_sample_count() const + { + return _running_sample_count; + } + +private: + uhd::rx_metadata_t& _metadata; + size_t* _data; + size_t _size; + size_t _remaining; + size_t _write_pos; + size_t _running_sample_count; +}; + +} // namespace uhd::transport::detail + /*! * Implementation of rx streamer manipulation of frame buffers and packet info. * This class is part of rx_streamer_impl, split into a separate unit as it is @@ -115,6 +175,7 @@ public: */ size_t get_recv_buffs(std::vector<const void*>& buffs, rx_metadata_t& metadata, + detail::eov_data_wrapper& eov_positions, const int32_t timeout_ms) { // Function to set metadata based on alignment error @@ -184,10 +245,14 @@ public: // Get payload pointers for each buffer and aggregate eob. We set eob to // true if any channel has it set, since no more data will be received for // that channel. In most cases, all channels should have the same value. + // We do the same for eov here, as it is expected that eov will be the + // same for all channels. bool eob = false; + bool eov = false; for (size_t i = 0; i < buffs.size(); i++) { buffs[i] = _infos[i].payload; eob |= _infos[i].eob; + eov |= _infos[i].eov; } // Set the metadata from the buffer information at index zero @@ -199,10 +264,21 @@ public: metadata.end_of_burst = eob; metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + // If the caller wants eov indications via metadata, then check + // eov and set the metadata values appropriately. Note that only + // channel 0 is checked for eov--in most cases, it should be the + // same for all channels. + if (eov_positions.data() && eov) { + eov_positions.push_back( + eov_positions.get_running_sample_count() + + info_0.payload_bytes / _bytes_per_item); + } + // Done with these packets, save timestamp info for next call _last_read_time_info.has_time_spec = metadata.has_time_spec; _last_read_time_info.time_spec = metadata.time_spec; _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item; + eov_positions.update_running_sample_count(_last_read_time_info.num_samps); return _last_read_time_info.num_samps; } @@ -293,6 +369,9 @@ private: // Information about the last data packet processed last_read_time_info_t _last_read_time_info; + // Total number of samples read, used in determining EOV positions + size_t _total_num_samps = 0; + // Flag that indicates an overrun occurred. The streamer will return an // overrun error when no more packets are available. std::atomic<bool> _stopped_due_to_overrun{false}; diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index fa84026fe..c594dd530 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -60,6 +60,41 @@ private: uhd::tx_metadata_t _metadata_cache; }; +class tx_eov_data_wrapper +{ +public: + tx_eov_data_wrapper(const uhd::tx_metadata_t& metadata) + : _eov_positions(metadata.eov_positions) + , _eov_positions_size(metadata.eov_positions_size) + , _remaining(metadata.eov_positions_size) + , _read_pos(0) + { + } + + UHD_FORCE_INLINE size_t* data() const + { + return _eov_positions; + } + + UHD_FORCE_INLINE size_t remaining() const + { + return _remaining; + } + + UHD_FORCE_INLINE size_t pop_front() + { + assert(_eov_positions && _remaining > 0); + _remaining--; + return _eov_positions[_read_pos++]; + } + +private: + size_t* _eov_positions; + size_t _eov_positions_size; + size_t _remaining; + size_t _read_pos; +}; + } // namespace detail /*! @@ -125,60 +160,157 @@ public: _metadata_cache.check(metadata); - const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + const bool eob_on_last_packet = metadata.end_of_burst; - if (nsamps_per_buff == 0) { - // Send requests with no samples are handled here, such as end of - // burst. Send packets need to have at least one sample based on the - // chdr specification, so we use _zero_buffs here. - _send_one_packet(_zero_buffs.data(), - 0, // buffer offset - 1, // num samples - metadata, - timeout_ms); + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); - return 0; - } else if (nsamps_per_buff <= _spp) { - return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + detail::tx_eov_data_wrapper eov_positions(metadata); + + // If there are EOVs specified in the metadata, it will be necessary + // to break up the packet sends based on where the EOVs should be + // generated in the sequence of packets. + // + // `nsamps_to_send_remaining` represents the total number of + // samples remaining to send to fulfill the caller's request. + size_t nsamps_to_send_remaining = nsamps_per_buff; + + // `nsamps_to_send` represents a subset of the total number of + // samples to send based on whether or not the caller's metadata + // specifies EOV positions. + // * If there are no EOVs, it represents the entire send request + // made by the caller. It may be broken up into chunks no larger + // than _spp later on in the function, but it will not be broken up + // due to EOV. There will only be one iteration through the do/ + // while loop. + // * If there are EOVs, `nsamps_to_send` represents the number of + // samples to send to get to the next EOV position. Again, it may + // be broken up into chunks no larger than _spp, but note that the + // final chunk will have EOV signalled in its header. There may be + // multiple iterations through the do/while loop to fulfill the + // caller's entire send request. + size_t nsamps_to_send; + + // `num_samps_sent` is the return value from each individual call + // to `_send_one_packet()`. + size_t num_samps_sent = 0; + + // `total_nsamps_sent` accumulates the total number of samples sent + // in each chunk, and is used to determine the offset within `buffs` + // to pass to `_send_one_packet()`. + size_t total_nsamps_sent = 0; + + size_t last_eov_position = 0; + bool eov; + + do { + if (eov_positions.data() and eov_positions.remaining() > 0) { + size_t next_eov_position = eov_positions.pop_front(); + // Check basic requirements: EOV positions must be monotonically + // increasing + if (next_eov_position <= last_eov_position) { + throw uhd::value_error("Invalid EOV position specified " + "(violates eov_pos[n] > eov_pos[n-1])"); + } + // EOV position must be within the range of the samples written + if (next_eov_position > nsamps_per_buff) { + throw uhd::value_error("Invalid EOV position specified " + "(violates eov_pos[n] <= nsamps_per_buff)"); + } + nsamps_to_send = next_eov_position - last_eov_position; + eov = true; + } else { + // No EOVs, or the EOV position list has been exhausted: + // simply send the remaining samples + nsamps_to_send = nsamps_to_send_remaining; + eov = false; + } - } else { - size_t total_num_samps_sent = 0; - const bool eob = metadata.end_of_burst; - metadata.end_of_burst = false; + if (nsamps_to_send == 0) { + // Send requests with no samples are handled here, such as end of + // burst. Send packets need to have at least one sample based on the + // chdr specification, so we use _zero_buffs here. + _send_one_packet(_zero_buffs.data(), + 0, // buffer offset + 1, // num samples + metadata, + false, + timeout_ms); - const size_t num_fragments = (nsamps_per_buff - 1) / _spp; - const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1; + return 0; - for (size_t i = 0; i < num_fragments; i++) { - const size_t num_samps_sent = _send_one_packet( - buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + } else if (nsamps_to_send <= _spp) { + // If last packet, apply saved EOB state to metadata + metadata.end_of_burst = + (eob_on_last_packet and nsamps_to_send == nsamps_to_send_remaining); - total_num_samps_sent += num_samps_sent; + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, nsamps_to_send, metadata, eov, timeout_ms); - if (num_samps_sent == 0) { - return total_num_samps_sent; + metadata.start_of_burst = false; + } else { + // Note: since `nsamps_to_send` is guaranteed to be > _spp + // if the code reaches this else clause, `num_fragments` will + // always be at least 1. + const size_t num_fragments = (nsamps_to_send - 1) / _spp; + const size_t final_length = ((nsamps_to_send - 1) % _spp) + 1; + + metadata.end_of_burst = false; + + for (size_t i = 0; i < num_fragments; i++) { + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, _spp, metadata, false, timeout_ms); + + // Advance sample accumulator and decrement remaining + // samples for this segment + total_nsamps_sent += num_samps_sent; + nsamps_to_send_remaining -= num_samps_sent; + + if (num_samps_sent == 0) { + return total_nsamps_sent; + } + + // Setup timespec for the next fragment + if (metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + metadata.start_of_burst = false; } - // Setup timespec for the next fragment - if (metadata.has_time_spec) { - metadata.time_spec = - metadata.time_spec - + time_spec_t::from_ticks(num_samps_sent, _samp_rate); - } + // Send the final fragment + metadata.end_of_burst = + (eob_on_last_packet and final_length == nsamps_to_send_remaining); - metadata.start_of_burst = false; + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, final_length, metadata, eov, timeout); } - // Send the final fragment - metadata.end_of_burst = eob; + // Advance sample accumulator and decrement remaining samples + total_nsamps_sent += num_samps_sent; + nsamps_to_send_remaining -= num_samps_sent; - size_t nsamps_sent = - total_num_samps_sent - + _send_one_packet( - buffs, total_num_samps_sent, final_length, metadata, timeout); + // Loop exit condition: return from `_send_one_packet()` indicates + // an error + if (num_samps_sent == 0) { + break; + } - return nsamps_sent; - } + // If there are more samples to be sent, thus requiring another + // trip around the do/while loop, update the timespec in the + // metadata for the next fragment (if desired) + if (nsamps_to_send_remaining > 0 and metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + last_eov_position = total_nsamps_sent; + + } while (nsamps_to_send_remaining > 0); + + return total_nsamps_sent; } protected: @@ -233,10 +365,11 @@ private: const size_t buffer_offset_in_samps, const size_t num_samples, const tx_metadata_t& metadata, + const bool eov, const int32_t timeout_ms) { if (!_zero_copy_streamer.get_send_buffs( - _out_buffs, num_samples, metadata, timeout_ms)) { + _out_buffs, num_samples, metadata, eov, timeout_ms)) { return 0; } diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp index 5ac7a1e8c..aa36c9d3b 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -75,12 +75,14 @@ public: * \param buffs returns a pointer to the buffer data * \param nsamps_per_buff the number of samples that will be written to each buffer * \param metadata the metadata to write to the packet header + * \param eov EOV flag to write to the packet header * \param timeout_ms timeout in milliseconds * \return true if the operation was sucessful, false if timeout occurs */ UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs, const size_t nsamps_per_buff, const tx_metadata_t& metadata, + const bool eov, const int32_t timeout_ms) { // Try to get a buffer per channel @@ -106,6 +108,7 @@ public: info.payload_bytes = nsamps_per_buff * _bytes_per_item; info.eob = metadata.end_of_burst; + info.eov = eov; // Write packet header for (size_t i = 0; i < buffs.size(); i++) { diff --git a/host/lib/types/types.cpp b/host/lib/types/types.cpp index aa87f30a5..7e947a4f8 100644 --- a/host/lib/types/types.cpp +++ b/host/lib/types/types.cpp @@ -24,11 +24,11 @@ stream_cmd_t::stream_cmd_t(const stream_mode_t &stream_mode): /*********************************************************************** * metadata **********************************************************************/ -tx_metadata_t::tx_metadata_t(void): - has_time_spec(false), - time_spec(time_spec_t()), - start_of_burst(false), - end_of_burst(false) +tx_metadata_t::tx_metadata_t(void) + : has_time_spec(false) + , time_spec(time_spec_t()) + , start_of_burst(false) + , end_of_burst(false) { /* NOP */ } diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp index 1b1311908..953b82028 100644 --- a/host/tests/rx_streamer_test.cpp +++ b/host/tests/rx_streamer_test.cpp @@ -18,6 +18,7 @@ namespace uhd { namespace transport { struct mock_header_t { bool eob = false; + bool eov = false; bool has_tsf = false; uint64_t tsf = 0; size_t payload_bytes = 0; @@ -39,6 +40,7 @@ public: struct packet_info_t { bool eob = false; + bool eov = false; bool has_tsf = false; uint64_t tsf = 0; size_t payload_bytes = 0; @@ -55,6 +57,7 @@ public: packet_info_t info; info.eob = header.eob; + info.eov = header.eov; info.has_tsf = header.has_tsf; info.tsf = header.tsf; info.payload_bytes = header.payload_bytes; @@ -742,3 +745,109 @@ BOOST_AUTO_TEST_CASE(test_recv_alignment_error) BOOST_CHECK_EQUAL(num_samps_ret, 0); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT); } + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_eov) +{ + const size_t NUM_PACKETS = 5; + const std::string format("fc64"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * NUM_PACKETS; + std::vector<std::complex<double>> buff(num_samps); + + for (size_t i = 0; i < NUM_PACKETS; i++) { + mock_header_t header; + header.eob = false; + header.has_tsf = true; + header.tsf = i; + + for (size_t j = 0; j < NUM_PACKETS; j++) { + header.eov = (i == j); + push_back_recv_packet(recv_links[0], header, spp); + } + + uhd::rx_metadata_t metadata; + // Create a vector with storage for two EOVs even though we expect + // only one, since filling the EOV vector results in an early + // termination of `recv()` (which we don't want here). + std::vector<size_t> eov_positions(2); + metadata.eov_positions = eov_positions.data(); + metadata.eov_positions_size = eov_positions.size(); + + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.eov_positions, eov_positions.data()); + BOOST_CHECK_EQUAL(metadata.eov_positions_size, eov_positions.size()); + BOOST_CHECK_EQUAL(metadata.eov_positions_count, 1); + BOOST_CHECK_EQUAL(eov_positions[0], (i + 1) * spp); + } +} + +BOOST_AUTO_TEST_CASE(test_recv_two_channel_aggregate_eov) +{ + const size_t NUM_PACKETS = 20; + const std::string format("fc64"); + + // This vector defines which packets in each channel's mock link will + // signal EOV in their packet headers. + // + // For example, for a vector with 3 values, [3, 5, 8]: + // Link 0 packets with EOV: 3rd, 6th, 9th, 12th, 15th, ... + // Link 1 packets with EOV: 5th, 10th, 15th, 20th, ... + // Link 2 packets with EOV: 8th, 16th, 24th, 32nd, ... + const std::vector<size_t> eov_every_nth_packet{3, 5}; + + const size_t num_chans = eov_every_nth_packet.size(); + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * NUM_PACKETS; + + std::vector<std::vector<std::complex<double>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + mock_header_t header; + std::vector<size_t> expected_eov_offsets; + for (size_t i = 0; i < NUM_PACKETS; i++) { + bool eov = false; + for (size_t ch = 0; ch < num_chans; ch++) { + header.eob = false; + header.has_tsf = false; + header.eov = ((i + 1) % eov_every_nth_packet[ch]) == 0; + + push_back_recv_packet(recv_links[ch], header, spp); + + eov |= header.eov; + } + if(eov) { + expected_eov_offsets.push_back(spp * (i + 1)); + } + } + + uhd::rx_metadata_t metadata; + + std::vector<size_t> eov_positions(expected_eov_offsets.size() + 1); + metadata.eov_positions = eov_positions.data(); + metadata.eov_positions_size = eov_positions.size(); + + size_t num_samps_ret = + streamer->recv(buffers, num_samps, metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.eov_positions_count, expected_eov_offsets.size()); + for(size_t i = 0; i < metadata.eov_positions_count; i++) { + BOOST_CHECK_EQUAL(expected_eov_offsets[i], metadata.eov_positions[i]); + } +} diff --git a/host/tests/streamer_benchmark.cpp b/host/tests/streamer_benchmark.cpp index 02aa102a0..0c11441ed 100644 --- a/host/tests/streamer_benchmark.cpp +++ b/host/tests/streamer_benchmark.cpp @@ -47,6 +47,7 @@ public: struct packet_info_t { bool eob = false; + bool eov = false; bool has_tsf = false; uint64_t tsf = 0; size_t payload_bytes = 0; @@ -108,6 +109,7 @@ public: struct packet_info_t { bool eob = false; + bool eov = false; bool has_tsf = false; uint64_t tsf = 0; size_t payload_bytes = 0; diff --git a/host/tests/tx_streamer_test.cpp b/host/tests/tx_streamer_test.cpp index 09c8357df..2288bd029 100644 --- a/host/tests/tx_streamer_test.cpp +++ b/host/tests/tx_streamer_test.cpp @@ -26,6 +26,7 @@ public: struct packet_info_t { bool eob = false; + bool eov = false; bool has_tsf = false; uint64_t tsf = 0; size_t payload_bytes = 0; @@ -158,6 +159,34 @@ pop_send_packet(mock_send_link::sptr send_link) return std::make_tuple(info, data, packet_samps, packet.first); } +//! Generates a non-biased random number in the range (low, high). +static size_t generate_rand(size_t low, size_t high) +{ + return (std::rand() % (high - low + 1)) + low; +} + +/*! + * Generates a vector of legal random EOV positions. + * `eovs` is a vector already sized to the desired number of EOV positions. + * The range [1, num_samps) will be broken into N=`eov.size()` + * non-overlapping adjacent ranges, and a random value within each range will + * be generated and stored in the vector. + */ +static void generate_random_eov_positions( + std::vector<size_t>& eovs, const size_t num_samps) +{ + UHD_ASSERT_THROW(!eovs.empty()); + + const size_t num_eovs = eovs.size(); + const size_t range_size = (num_samps - 1) / num_eovs; + size_t low = 1; + for (size_t i = 0; i < num_eovs; i++) { + const size_t high = low + range_size - 1; + eovs[i] = generate_rand(low, high); + low = high + 1; + } +} + /*! * Tests */ @@ -216,6 +245,250 @@ BOOST_AUTO_TEST_CASE(test_send_one_channel_one_packet) } } +BOOST_AUTO_TEST_CASE(test_send_one_channel_eov_lte_spp) +{ + const size_t NUM_PKTS_TO_TEST = 30; + const std::string format("fc32"); + + auto send_links = make_links(1); + auto streamer = make_tx_streamer(send_links, format); + + // Allocate metadata + uhd::tx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + // Allocate buffer + const size_t num_samps = streamer->get_max_num_samps(); + std::vector<std::complex<float>> buff(num_samps); + + // Send buffer and check resultant packets + size_t num_accum_samps = 0; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + std::cout << "sending packet " << i << std::endl; + + // Vary number of EOVs for each send + const size_t num_eovs = (i % 10) + 1; + std::vector<size_t> eovs(num_eovs); + generate_random_eov_positions(eovs, num_samps); + + metadata.eov_positions = eovs.data(); + metadata.eov_positions_size = eovs.size(); + + const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0); + BOOST_CHECK_EQUAL(num_sent, num_samps); + metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + + mock_tx_data_xport::packet_info_t info; + std::complex<uint16_t>* data; + size_t packet_samps; + boost::shared_array<uint8_t> frame_buff; + + // Check number of packets written (should be # of EOVs plus one) + size_t num_packets_written = send_links[0]->get_num_packets(); + BOOST_CHECK_EQUAL(num_packets_written, eovs.size() + 1); + + // Pop each packets and check size of each relative to EOV positions + size_t total_samps_popped = 0; + size_t eov_index = 0; + size_t last_eov_position = 0; + while (total_samps_popped < num_samps) { + std::tie(info, data, packet_samps, frame_buff) = + pop_send_packet(send_links[0]); + + // All but the last packet should have an EOV indication + if (eov_index < eovs.size()) { + BOOST_CHECK_EQUAL(eovs[eov_index] - last_eov_position, packet_samps); + BOOST_CHECK(info.eov); + last_eov_position = eovs[eov_index]; + } else { + BOOST_CHECK_EQUAL(num_samps - last_eov_position, packet_samps); + BOOST_CHECK(not info.eov); + } + + // Verify correctness of TSF data + BOOST_CHECK(info.has_tsf); + BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE); + + eov_index++; + num_accum_samps += packet_samps; + total_samps_popped += packet_samps; + } + } +} + +BOOST_AUTO_TEST_CASE(test_send_one_channel_eov_gt_spp) +{ + const size_t NUM_PKTS_TO_TEST = 30; + const std::string format("fc32"); + + auto send_links = make_links(1); + auto streamer = make_tx_streamer(send_links, format); + + // Allocate metadata + uhd::tx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + // Allocate buffer + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * 50; + std::vector<std::complex<float>> buff(num_samps); + + // Send buffer and check resultant packets + size_t num_accum_samps = 0; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + std::cout << "sending packet " << i << std::endl; + + // Vary number of EOVs for each send + const size_t num_eovs = (i % 5) + 1; + std::vector<size_t> eovs(num_eovs); + generate_random_eov_positions(eovs, num_samps); + + metadata.eov_positions = eovs.data(); + metadata.eov_positions_size = eovs.size(); + + const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0); + BOOST_CHECK_EQUAL(num_sent, num_samps); + metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + + mock_tx_data_xport::packet_info_t info; + std::complex<uint16_t>* data; + size_t packet_samps; + boost::shared_array<uint8_t> frame_buff; + + size_t total_samps_popped = 0; + size_t eov_index = 0; + size_t last_eov_position = 0; + size_t distance_to_next_eov = eovs[eov_index] - last_eov_position; + size_t distance_to_end = num_samps; + while (total_samps_popped < num_samps) { + std::tie(info, data, packet_samps, frame_buff) = + pop_send_packet(send_links[0]); + + if (distance_to_next_eov <= spp) { + // Next EOV is within a single SPP: ensure packet is EOV + BOOST_CHECK_EQUAL(distance_to_next_eov, packet_samps); + BOOST_CHECK(info.eov); + last_eov_position = eovs[eov_index]; + eov_index++; + if (eov_index < eovs.size()) { + // NOTE: Addition of `packet_samps` is to compensate for + // its subtraction below + distance_to_next_eov = + eovs[eov_index] - last_eov_position + packet_samps; + } else { + // No more EOVs + distance_to_next_eov = std::numeric_limits<size_t>::max(); + } + } else if (distance_to_end <= spp) { + // End of data within a single SPP + BOOST_CHECK_EQUAL(distance_to_end, packet_samps); + BOOST_CHECK(not info.eov); + } else { + BOOST_CHECK(not info.eov); + } + + // Verify correctness of TSF data + BOOST_CHECK(info.has_tsf); + BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE); + + distance_to_end -= packet_samps; + distance_to_next_eov -= packet_samps; + total_samps_popped += packet_samps; + num_accum_samps += packet_samps; + } + } +} + +BOOST_AUTO_TEST_CASE(test_send_one_channel_eov_corner_case) +{ + const std::string format("fc32"); + + auto send_links = make_links(1); + auto streamer = make_tx_streamer(send_links, format); + + // Allocate metadata + uhd::tx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + // Allocate buffer + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * 2; + std::vector<std::complex<float>> buff(num_samps); + + // Mark every sample as an EOV :D + std::vector<size_t> eovs(num_samps); + for (size_t i = 0; i < num_samps; i++) { + eovs[i] = i + 1; + } + + metadata.eov_positions = eovs.data(); + metadata.eov_positions_size = eovs.size(); + + const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0); + BOOST_CHECK_EQUAL(num_sent, num_samps); + + mock_tx_data_xport::packet_info_t info; + std::complex<uint16_t>* data; + size_t packet_samps; + boost::shared_array<uint8_t> frame_buff; + + // Check all packets for EOV + BOOST_CHECK_EQUAL(send_links[0]->get_num_packets(), num_samps); + + for (size_t i = 0; i < num_samps; i++) { + std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]); + BOOST_CHECK_EQUAL(packet_samps, 1); + BOOST_CHECK(info.eov); + } +} + +BOOST_AUTO_TEST_CASE(test_send_one_channel_eov_error_cases) +{ + const std::string format("fc32"); + + auto send_links = make_links(1); + auto streamer = make_tx_streamer(send_links, format); + + uhd::tx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp; + std::vector<std::complex<float>> buff(num_samps); + + // Error case: EOV of 0 + size_t eov = 0; + metadata.eov_positions = &eov; + metadata.eov_positions_size = 1; + + BOOST_CHECK_THROW( + streamer->send(&buff.front(), num_samps, metadata, 1.0), uhd::value_error); + + // Error case: Adjacent EOV values that are the same + std::vector<size_t> eovs{100, 100}; + metadata.eov_positions = eovs.data(); + metadata.eov_positions_size = eovs.size(); + + BOOST_CHECK_THROW( + streamer->send(&buff.front(), num_samps, metadata, 1.0), uhd::value_error); + + // Error case: EOV values not monotonically increasing + eovs = {50, 25}; + + BOOST_CHECK_THROW( + streamer->send(&buff.front(), num_samps, metadata, 1.0), uhd::value_error); + + // Error case: EOV values greater than nsamps_per_buff + BOOST_CHECK_THROW( + streamer->send(&buff.front(), 1, metadata, 1.0), uhd::value_error); +} + BOOST_AUTO_TEST_CASE(test_send_one_channel_multi_packet) { const size_t NUM_BUFFS_TO_TEST = 5; |