From 2f97f8bd0167d4179427efa8a955046fbf417e91 Mon Sep 17 00:00:00 2001 From: Aaron Rossetto Date: Fri, 27 Sep 2019 14:56:25 -0500 Subject: transport: Implement eov indications for Rx and Tx streams --- .../include/uhdlib/transport/rx_streamer_impl.hpp | 18 +- .../uhdlib/transport/rx_streamer_zero_copy.hpp | 79 ++++++++ .../include/uhdlib/transport/tx_streamer_impl.hpp | 215 +++++++++++++++++---- .../uhdlib/transport/tx_streamer_zero_copy.hpp | 3 + 4 files changed, 270 insertions(+), 45 deletions(-) (limited to 'host/lib/include/uhdlib/transport') diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp index 0691138e6..04cd12c59 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -139,10 +139,13 @@ public: const int32_t timeout_ms = static_cast(timeout * 1000); + detail::eov_data_wrapper eov_positions(metadata); + size_t total_samps_recv = - _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms); + _recv_one_packet(buffs, nsamps_per_buff, metadata, eov_positions, timeout_ms); - if (one_packet or metadata.end_of_burst) { + if (one_packet or metadata.end_of_burst or + (eov_positions.data() and eov_positions.remaining() == 0)) { return total_samps_recv; } @@ -153,13 +156,14 @@ public: // Loop until buffer is filled or error code. This method returns the // metadata from the first packet received, with the exception of - // end-of-burst. + // end-of-burst and end-of-vector indications (if requested). uhd::rx_metadata_t loop_metadata; while (total_samps_recv < nsamps_per_buff) { size_t num_samps = _recv_one_packet(buffs, nsamps_per_buff - total_samps_recv, loop_metadata, + eov_positions, timeout_ms, total_samps_recv * _convert_info.bytes_per_cpu_item); @@ -176,6 +180,10 @@ public: metadata.end_of_burst = true; break; } + // Return if the end-of-vector position array has been exhausted + if (eov_positions.data() and eov_positions.remaining() == 0) { + break; + } } return total_samps_recv; @@ -246,13 +254,15 @@ private: UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs, const size_t nsamps_per_buff, uhd::rx_metadata_t& metadata, + detail::eov_data_wrapper& eov_positions, const int32_t timeout_ms, const size_t buffer_offset_bytes = 0) { if (_buff_samps_remaining == 0) { // Current set of buffers has expired, get the next one _buff_samps_remaining = - _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms); + _zero_copy_streamer.get_recv_buffs( + _in_buffs, metadata, eov_positions, timeout_ms); _fragment_offset_in_samps = 0; } else { // There are samples still left in the current set of buffers diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp index 4da925062..98588c6f5 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -18,6 +18,66 @@ namespace uhd { namespace transport { +namespace detail { + +class eov_data_wrapper +{ +public: + eov_data_wrapper(uhd::rx_metadata_t& metadata) + : _metadata(metadata) + , _data(metadata.eov_positions) + , _size(metadata.eov_positions_size) + , _remaining(metadata.eov_positions_size) + , _write_pos(0) + , _running_sample_count(0) + { + } + + ~eov_data_wrapper() + { + _metadata.eov_positions = _data; + _metadata.eov_positions_size = _size; + _metadata.eov_positions_count = _write_pos; + } + + UHD_FORCE_INLINE size_t* data() const + { + return _data; + } + + UHD_FORCE_INLINE size_t remaining() const + { + return _remaining; + } + + UHD_FORCE_INLINE void push_back(size_t value) + { + assert(_data && _remaining > 0); + _data[_write_pos++] = value; + _remaining--; + } + + UHD_FORCE_INLINE void update_running_sample_count(size_t num_samples) + { + _running_sample_count += num_samples; + } + + UHD_FORCE_INLINE size_t get_running_sample_count() const + { + return _running_sample_count; + } + +private: + uhd::rx_metadata_t& _metadata; + size_t* _data; + size_t _size; + size_t _remaining; + size_t _write_pos; + size_t _running_sample_count; +}; + +} // namespace uhd::transport::detail + /*! * Implementation of rx streamer manipulation of frame buffers and packet info. * This class is part of rx_streamer_impl, split into a separate unit as it is @@ -115,6 +175,7 @@ public: */ size_t get_recv_buffs(std::vector& buffs, rx_metadata_t& metadata, + detail::eov_data_wrapper& eov_positions, const int32_t timeout_ms) { // Function to set metadata based on alignment error @@ -184,10 +245,14 @@ public: // Get payload pointers for each buffer and aggregate eob. We set eob to // true if any channel has it set, since no more data will be received for // that channel. In most cases, all channels should have the same value. + // We do the same for eov here, as it is expected that eov will be the + // same for all channels. bool eob = false; + bool eov = false; for (size_t i = 0; i < buffs.size(); i++) { buffs[i] = _infos[i].payload; eob |= _infos[i].eob; + eov |= _infos[i].eov; } // Set the metadata from the buffer information at index zero @@ -199,10 +264,21 @@ public: metadata.end_of_burst = eob; metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + // If the caller wants eov indications via metadata, then check + // eov and set the metadata values appropriately. Note that only + // channel 0 is checked for eov--in most cases, it should be the + // same for all channels. + if (eov_positions.data() && eov) { + eov_positions.push_back( + eov_positions.get_running_sample_count() + + info_0.payload_bytes / _bytes_per_item); + } + // Done with these packets, save timestamp info for next call _last_read_time_info.has_time_spec = metadata.has_time_spec; _last_read_time_info.time_spec = metadata.time_spec; _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item; + eov_positions.update_running_sample_count(_last_read_time_info.num_samps); return _last_read_time_info.num_samps; } @@ -293,6 +369,9 @@ private: // Information about the last data packet processed last_read_time_info_t _last_read_time_info; + // Total number of samples read, used in determining EOV positions + size_t _total_num_samps = 0; + // Flag that indicates an overrun occurred. The streamer will return an // overrun error when no more packets are available. std::atomic _stopped_due_to_overrun{false}; diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index fa84026fe..c594dd530 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -60,6 +60,41 @@ private: uhd::tx_metadata_t _metadata_cache; }; +class tx_eov_data_wrapper +{ +public: + tx_eov_data_wrapper(const uhd::tx_metadata_t& metadata) + : _eov_positions(metadata.eov_positions) + , _eov_positions_size(metadata.eov_positions_size) + , _remaining(metadata.eov_positions_size) + , _read_pos(0) + { + } + + UHD_FORCE_INLINE size_t* data() const + { + return _eov_positions; + } + + UHD_FORCE_INLINE size_t remaining() const + { + return _remaining; + } + + UHD_FORCE_INLINE size_t pop_front() + { + assert(_eov_positions && _remaining > 0); + _remaining--; + return _eov_positions[_read_pos++]; + } + +private: + size_t* _eov_positions; + size_t _eov_positions_size; + size_t _remaining; + size_t _read_pos; +}; + } // namespace detail /*! @@ -125,60 +160,157 @@ public: _metadata_cache.check(metadata); - const int32_t timeout_ms = static_cast(timeout * 1000); + const bool eob_on_last_packet = metadata.end_of_burst; - if (nsamps_per_buff == 0) { - // Send requests with no samples are handled here, such as end of - // burst. Send packets need to have at least one sample based on the - // chdr specification, so we use _zero_buffs here. - _send_one_packet(_zero_buffs.data(), - 0, // buffer offset - 1, // num samples - metadata, - timeout_ms); + const int32_t timeout_ms = static_cast(timeout * 1000); - return 0; - } else if (nsamps_per_buff <= _spp) { - return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + detail::tx_eov_data_wrapper eov_positions(metadata); + + // If there are EOVs specified in the metadata, it will be necessary + // to break up the packet sends based on where the EOVs should be + // generated in the sequence of packets. + // + // `nsamps_to_send_remaining` represents the total number of + // samples remaining to send to fulfill the caller's request. + size_t nsamps_to_send_remaining = nsamps_per_buff; + + // `nsamps_to_send` represents a subset of the total number of + // samples to send based on whether or not the caller's metadata + // specifies EOV positions. + // * If there are no EOVs, it represents the entire send request + // made by the caller. It may be broken up into chunks no larger + // than _spp later on in the function, but it will not be broken up + // due to EOV. There will only be one iteration through the do/ + // while loop. + // * If there are EOVs, `nsamps_to_send` represents the number of + // samples to send to get to the next EOV position. Again, it may + // be broken up into chunks no larger than _spp, but note that the + // final chunk will have EOV signalled in its header. There may be + // multiple iterations through the do/while loop to fulfill the + // caller's entire send request. + size_t nsamps_to_send; + + // `num_samps_sent` is the return value from each individual call + // to `_send_one_packet()`. + size_t num_samps_sent = 0; + + // `total_nsamps_sent` accumulates the total number of samples sent + // in each chunk, and is used to determine the offset within `buffs` + // to pass to `_send_one_packet()`. + size_t total_nsamps_sent = 0; + + size_t last_eov_position = 0; + bool eov; + + do { + if (eov_positions.data() and eov_positions.remaining() > 0) { + size_t next_eov_position = eov_positions.pop_front(); + // Check basic requirements: EOV positions must be monotonically + // increasing + if (next_eov_position <= last_eov_position) { + throw uhd::value_error("Invalid EOV position specified " + "(violates eov_pos[n] > eov_pos[n-1])"); + } + // EOV position must be within the range of the samples written + if (next_eov_position > nsamps_per_buff) { + throw uhd::value_error("Invalid EOV position specified " + "(violates eov_pos[n] <= nsamps_per_buff)"); + } + nsamps_to_send = next_eov_position - last_eov_position; + eov = true; + } else { + // No EOVs, or the EOV position list has been exhausted: + // simply send the remaining samples + nsamps_to_send = nsamps_to_send_remaining; + eov = false; + } - } else { - size_t total_num_samps_sent = 0; - const bool eob = metadata.end_of_burst; - metadata.end_of_burst = false; + if (nsamps_to_send == 0) { + // Send requests with no samples are handled here, such as end of + // burst. Send packets need to have at least one sample based on the + // chdr specification, so we use _zero_buffs here. + _send_one_packet(_zero_buffs.data(), + 0, // buffer offset + 1, // num samples + metadata, + false, + timeout_ms); - const size_t num_fragments = (nsamps_per_buff - 1) / _spp; - const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1; + return 0; - for (size_t i = 0; i < num_fragments; i++) { - const size_t num_samps_sent = _send_one_packet( - buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + } else if (nsamps_to_send <= _spp) { + // If last packet, apply saved EOB state to metadata + metadata.end_of_burst = + (eob_on_last_packet and nsamps_to_send == nsamps_to_send_remaining); - total_num_samps_sent += num_samps_sent; + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, nsamps_to_send, metadata, eov, timeout_ms); - if (num_samps_sent == 0) { - return total_num_samps_sent; + metadata.start_of_burst = false; + } else { + // Note: since `nsamps_to_send` is guaranteed to be > _spp + // if the code reaches this else clause, `num_fragments` will + // always be at least 1. + const size_t num_fragments = (nsamps_to_send - 1) / _spp; + const size_t final_length = ((nsamps_to_send - 1) % _spp) + 1; + + metadata.end_of_burst = false; + + for (size_t i = 0; i < num_fragments; i++) { + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, _spp, metadata, false, timeout_ms); + + // Advance sample accumulator and decrement remaining + // samples for this segment + total_nsamps_sent += num_samps_sent; + nsamps_to_send_remaining -= num_samps_sent; + + if (num_samps_sent == 0) { + return total_nsamps_sent; + } + + // Setup timespec for the next fragment + if (metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + metadata.start_of_burst = false; } - // Setup timespec for the next fragment - if (metadata.has_time_spec) { - metadata.time_spec = - metadata.time_spec - + time_spec_t::from_ticks(num_samps_sent, _samp_rate); - } + // Send the final fragment + metadata.end_of_burst = + (eob_on_last_packet and final_length == nsamps_to_send_remaining); - metadata.start_of_burst = false; + num_samps_sent = _send_one_packet( + buffs, total_nsamps_sent, final_length, metadata, eov, timeout); } - // Send the final fragment - metadata.end_of_burst = eob; + // Advance sample accumulator and decrement remaining samples + total_nsamps_sent += num_samps_sent; + nsamps_to_send_remaining -= num_samps_sent; - size_t nsamps_sent = - total_num_samps_sent - + _send_one_packet( - buffs, total_num_samps_sent, final_length, metadata, timeout); + // Loop exit condition: return from `_send_one_packet()` indicates + // an error + if (num_samps_sent == 0) { + break; + } - return nsamps_sent; - } + // If there are more samples to be sent, thus requiring another + // trip around the do/while loop, update the timespec in the + // metadata for the next fragment (if desired) + if (nsamps_to_send_remaining > 0 and metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + last_eov_position = total_nsamps_sent; + + } while (nsamps_to_send_remaining > 0); + + return total_nsamps_sent; } protected: @@ -233,10 +365,11 @@ private: const size_t buffer_offset_in_samps, const size_t num_samples, const tx_metadata_t& metadata, + const bool eov, const int32_t timeout_ms) { if (!_zero_copy_streamer.get_send_buffs( - _out_buffs, num_samples, metadata, timeout_ms)) { + _out_buffs, num_samples, metadata, eov, timeout_ms)) { return 0; } diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp index 5ac7a1e8c..aa36c9d3b 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -75,12 +75,14 @@ public: * \param buffs returns a pointer to the buffer data * \param nsamps_per_buff the number of samples that will be written to each buffer * \param metadata the metadata to write to the packet header + * \param eov EOV flag to write to the packet header * \param timeout_ms timeout in milliseconds * \return true if the operation was sucessful, false if timeout occurs */ UHD_FORCE_INLINE bool get_send_buffs(std::vector& buffs, const size_t nsamps_per_buff, const tx_metadata_t& metadata, + const bool eov, const int32_t timeout_ms) { // Try to get a buffer per channel @@ -106,6 +108,7 @@ public: info.payload_bytes = nsamps_per_buff * _bytes_per_item; info.eob = metadata.end_of_burst; + info.eov = eov; // Write packet header for (size_t i = 0; i < buffs.size(); i++) { -- cgit v1.2.3