diff options
Diffstat (limited to 'host/lib')
7 files changed, 279 insertions, 50 deletions
| diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp index 9c5d88066..3333f4f9d 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -108,6 +108,7 @@ public:      struct packet_info_t      {          bool eob             = false; +        bool eov             = false;          bool has_tsf         = false;          uint64_t tsf         = 0;          size_t payload_bytes = 0; @@ -347,6 +348,7 @@ private:          packet_info_t info;          info.eob           = header.get_eob(); +        info.eov           = header.get_eov();          info.has_tsf       = optional_time.is_initialized();          info.tsf           = optional_time ? *optional_time : 0;          info.payload_bytes = _recv_packet->get_payload_size(); diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 1e54a2f7a..8658767b6 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -112,6 +112,7 @@ public:      struct packet_info_t      {          bool eob             = false; +        bool eov             = false;          bool has_tsf         = false;          uint64_t tsf         = 0;          size_t payload_bytes = 0; @@ -226,6 +227,7 @@ public:          }          _send_header.set_eob(info.eob); +        _send_header.set_eov(info.eov);          _send_header.set_seq_num(_data_seq_num++);          _send_packet->refresh(buff->data(), _send_header, tsf); diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp index 0691138e6..04cd12c59 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -139,10 +139,13 @@ public:          const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); +        detail::eov_data_wrapper eov_positions(metadata); +          size_t total_samps_recv = -            _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms); +            _recv_one_packet(buffs, nsamps_per_buff, metadata, eov_positions, timeout_ms); -        if (one_packet or metadata.end_of_burst) { +        if (one_packet or metadata.end_of_burst or +            (eov_positions.data() and eov_positions.remaining() == 0)) {              return total_samps_recv;          } @@ -153,13 +156,14 @@ public:          // Loop until buffer is filled or error code. This method returns the          // metadata from the first packet received, with the exception of -        // end-of-burst. +        // end-of-burst and end-of-vector indications (if requested).          uhd::rx_metadata_t loop_metadata;          while (total_samps_recv < nsamps_per_buff) {              size_t num_samps = _recv_one_packet(buffs,                  nsamps_per_buff - total_samps_recv,                  loop_metadata, +                eov_positions,                  timeout_ms,                  total_samps_recv * _convert_info.bytes_per_cpu_item); @@ -176,6 +180,10 @@ public:                  metadata.end_of_burst = true;                  break;              } +            // Return if the end-of-vector position array has been exhausted +            if (eov_positions.data() and eov_positions.remaining() == 0) { +                break; +            }          }          return total_samps_recv; @@ -246,13 +254,15 @@ private:      UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,          const size_t nsamps_per_buff,          uhd::rx_metadata_t& metadata, +        detail::eov_data_wrapper& eov_positions,          const int32_t timeout_ms,          const size_t buffer_offset_bytes = 0)      {          if (_buff_samps_remaining == 0) {              // Current set of buffers has expired, get the next one              _buff_samps_remaining = -                _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms); +                _zero_copy_streamer.get_recv_buffs( +                    _in_buffs, metadata, eov_positions, timeout_ms);              _fragment_offset_in_samps = 0;          } else {              // There are samples still left in the current set of buffers diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp index 4da925062..98588c6f5 100644 --- a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -18,6 +18,66 @@  namespace uhd { namespace transport { +namespace detail { + +class eov_data_wrapper +{ +public: +    eov_data_wrapper(uhd::rx_metadata_t& metadata) +        : _metadata(metadata) +        , _data(metadata.eov_positions) +        , _size(metadata.eov_positions_size) +        , _remaining(metadata.eov_positions_size) +        , _write_pos(0) +        , _running_sample_count(0) +    { +    } + +    ~eov_data_wrapper() +    { +        _metadata.eov_positions = _data; +        _metadata.eov_positions_size = _size; +        _metadata.eov_positions_count = _write_pos; +    } + +    UHD_FORCE_INLINE size_t* data() const +    { +        return _data; +    } + +    UHD_FORCE_INLINE size_t remaining() const +    { +        return _remaining; +    } + +    UHD_FORCE_INLINE void push_back(size_t value) +    { +        assert(_data && _remaining > 0); +        _data[_write_pos++] = value; +        _remaining--; +    } + +    UHD_FORCE_INLINE void update_running_sample_count(size_t num_samples) +    { +        _running_sample_count += num_samples; +    } + +    UHD_FORCE_INLINE size_t get_running_sample_count() const +    { +        return _running_sample_count; +    } + +private: +    uhd::rx_metadata_t& _metadata; +    size_t*            _data; +    size_t             _size; +    size_t             _remaining; +    size_t             _write_pos; +    size_t             _running_sample_count; +}; + +} // namespace uhd::transport::detail +  /*!   * Implementation of rx streamer manipulation of frame buffers and packet info.   * This class is part of rx_streamer_impl, split into a separate unit as it is @@ -115,6 +175,7 @@ public:       */      size_t get_recv_buffs(std::vector<const void*>& buffs,          rx_metadata_t& metadata, +        detail::eov_data_wrapper& eov_positions,          const int32_t timeout_ms)      {          // Function to set metadata based on alignment error @@ -184,10 +245,14 @@ public:          // Get payload pointers for each buffer and aggregate eob. We set eob to          // true if any channel has it set, since no more data will be received for          // that channel. In most cases, all channels should have the same value. +        // We do the same for eov here, as it is expected that eov will be the +        // same for all channels.          bool eob = false; +        bool eov = false;          for (size_t i = 0; i < buffs.size(); i++) {              buffs[i] = _infos[i].payload;              eob |= _infos[i].eob; +            eov |= _infos[i].eov;          }          // Set the metadata from the buffer information at index zero @@ -199,10 +264,21 @@ public:          metadata.end_of_burst   = eob;          metadata.error_code     = rx_metadata_t::ERROR_CODE_NONE; +        // If the caller wants eov indications via metadata, then check +        // eov and set the metadata values appropriately. Note that only +        // channel 0 is checked for eov--in most cases, it should be the +        // same for all channels. +        if (eov_positions.data() && eov) { +            eov_positions.push_back( +                eov_positions.get_running_sample_count() + +                info_0.payload_bytes / _bytes_per_item); +        } +          // Done with these packets, save timestamp info for next call          _last_read_time_info.has_time_spec = metadata.has_time_spec;          _last_read_time_info.time_spec     = metadata.time_spec;          _last_read_time_info.num_samps     = info_0.payload_bytes / _bytes_per_item; +        eov_positions.update_running_sample_count(_last_read_time_info.num_samps);          return _last_read_time_info.num_samps;      } @@ -293,6 +369,9 @@ private:      // Information about the last data packet processed      last_read_time_info_t _last_read_time_info; +    // Total number of samples read, used in determining EOV positions +    size_t _total_num_samps = 0; +      // Flag that indicates an overrun occurred. The streamer will return an      // overrun error when no more packets are available.      std::atomic<bool> _stopped_due_to_overrun{false}; diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index fa84026fe..c594dd530 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -60,6 +60,41 @@ private:      uhd::tx_metadata_t _metadata_cache;  }; +class tx_eov_data_wrapper +{ +public: +    tx_eov_data_wrapper(const uhd::tx_metadata_t& metadata) +        : _eov_positions(metadata.eov_positions) +        , _eov_positions_size(metadata.eov_positions_size) +        , _remaining(metadata.eov_positions_size) +        , _read_pos(0) +    { +    } + +    UHD_FORCE_INLINE size_t* data() const +    { +        return _eov_positions; +    } + +    UHD_FORCE_INLINE size_t remaining() const +    { +        return _remaining; +    } + +    UHD_FORCE_INLINE size_t pop_front() +    { +        assert(_eov_positions && _remaining > 0); +        _remaining--; +        return _eov_positions[_read_pos++]; +    } + +private: +    size_t* _eov_positions; +    size_t  _eov_positions_size; +    size_t  _remaining; +    size_t  _read_pos; +}; +  } // namespace detail  /*! @@ -125,60 +160,157 @@ public:          _metadata_cache.check(metadata); -        const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); +        const bool eob_on_last_packet = metadata.end_of_burst; -        if (nsamps_per_buff == 0) { -            // Send requests with no samples are handled here, such as end of -            // burst. Send packets need to have at least one sample based on the -            // chdr specification, so we use _zero_buffs here. -            _send_one_packet(_zero_buffs.data(), -                0, // buffer offset -                1, // num samples -                metadata, -                timeout_ms); +        const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); -            return 0; -        } else if (nsamps_per_buff <= _spp) { -            return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); +        detail::tx_eov_data_wrapper eov_positions(metadata); + +        // If there are EOVs specified in the metadata, it will be necessary +        // to break up the packet sends based on where the EOVs should be +        // generated in the sequence of packets. +        // +        // `nsamps_to_send_remaining` represents the total number of +        // samples remaining to send to fulfill the caller's request. +        size_t nsamps_to_send_remaining = nsamps_per_buff; + +        // `nsamps_to_send` represents a subset of the total number of +        // samples to send based on whether or not the caller's metadata +        // specifies EOV positions. +        // * If there are no EOVs, it represents the entire send request +        //   made by the caller. It may be broken up into chunks no larger +        //   than _spp later on in the function, but it will not be broken up +        //   due to EOV. There will only be one iteration through the do/ +        //   while loop. +        // * If there are EOVs, `nsamps_to_send` represents the number of +        //   samples to send to get to the next EOV position. Again, it may +        //   be broken up into chunks no larger than _spp, but note that the +        //   final chunk will have EOV signalled in its header. There may be +        //   multiple iterations through the do/while loop to fulfill the +        //   caller's entire send request. +        size_t nsamps_to_send; + +        // `num_samps_sent` is the return value from each individual call +        // to `_send_one_packet()`. +        size_t num_samps_sent = 0; + +        // `total_nsamps_sent` accumulates the total number of samples sent +        // in each chunk, and is used to determine the offset within `buffs` +        // to pass to `_send_one_packet()`. +        size_t total_nsamps_sent = 0; + +        size_t last_eov_position = 0; +        bool eov; + +        do { +            if (eov_positions.data() and eov_positions.remaining() > 0) { +                size_t next_eov_position = eov_positions.pop_front(); +                // Check basic requirements: EOV positions must be monotonically +                // increasing +                if (next_eov_position <= last_eov_position) { +                    throw uhd::value_error("Invalid EOV position specified " +                                           "(violates eov_pos[n] > eov_pos[n-1])"); +                } +                // EOV position must be within the range of the samples written +                if (next_eov_position > nsamps_per_buff) { +                    throw uhd::value_error("Invalid EOV position specified " +                                           "(violates eov_pos[n] <= nsamps_per_buff)"); +                } +                nsamps_to_send = next_eov_position - last_eov_position; +                eov            = true; +            } else { +                // No EOVs, or the EOV position list has been exhausted: +                // simply send the remaining samples +                nsamps_to_send = nsamps_to_send_remaining; +                eov            = false; +            } -        } else { -            size_t total_num_samps_sent = 0; -            const bool eob              = metadata.end_of_burst; -            metadata.end_of_burst       = false; +            if (nsamps_to_send == 0) { +                // Send requests with no samples are handled here, such as end of +                // burst. Send packets need to have at least one sample based on the +                // chdr specification, so we use _zero_buffs here. +                _send_one_packet(_zero_buffs.data(), +                    0, // buffer offset +                    1, // num samples +                    metadata, +                    false, +                    timeout_ms); -            const size_t num_fragments = (nsamps_per_buff - 1) / _spp; -            const size_t final_length  = ((nsamps_per_buff - 1) % _spp) + 1; +                return 0; -            for (size_t i = 0; i < num_fragments; i++) { -                const size_t num_samps_sent = _send_one_packet( -                    buffs, total_num_samps_sent, _spp, metadata, timeout_ms); +            } else if (nsamps_to_send <= _spp) { +                // If last packet, apply saved EOB state to metadata +                metadata.end_of_burst = +                    (eob_on_last_packet and nsamps_to_send == nsamps_to_send_remaining); -                total_num_samps_sent += num_samps_sent; +                num_samps_sent = _send_one_packet( +                    buffs, total_nsamps_sent, nsamps_to_send, metadata, eov, timeout_ms); -                if (num_samps_sent == 0) { -                    return total_num_samps_sent; +                metadata.start_of_burst = false; +            } else { +                // Note: since `nsamps_to_send` is guaranteed to be > _spp +                // if the code reaches this else clause, `num_fragments` will +                // always be at least 1. +                const size_t num_fragments = (nsamps_to_send - 1) / _spp; +                const size_t final_length  = ((nsamps_to_send - 1) % _spp) + 1; + +                metadata.end_of_burst = false; + +                for (size_t i = 0; i < num_fragments; i++) { +                    num_samps_sent = _send_one_packet( +                        buffs, total_nsamps_sent, _spp, metadata, false, timeout_ms); + +                    // Advance sample accumulator and decrement remaining +                    // samples for this segment +                    total_nsamps_sent += num_samps_sent; +                    nsamps_to_send_remaining -= num_samps_sent; + +                    if (num_samps_sent == 0) { +                        return total_nsamps_sent; +                    } + +                    // Setup timespec for the next fragment +                    if (metadata.has_time_spec) { +                        metadata.time_spec = +                            metadata.time_spec +                            + time_spec_t::from_ticks(num_samps_sent, _samp_rate); +                    } + +                    metadata.start_of_burst = false;                  } -                // Setup timespec for the next fragment -                if (metadata.has_time_spec) { -                    metadata.time_spec = -                        metadata.time_spec -                        + time_spec_t::from_ticks(num_samps_sent, _samp_rate); -                } +                // Send the final fragment +                metadata.end_of_burst = +                    (eob_on_last_packet and final_length == nsamps_to_send_remaining); -                metadata.start_of_burst = false; +                num_samps_sent = _send_one_packet( +                    buffs, total_nsamps_sent, final_length, metadata, eov, timeout);              } -            // Send the final fragment -            metadata.end_of_burst = eob; +            // Advance sample accumulator and decrement remaining samples +            total_nsamps_sent += num_samps_sent; +            nsamps_to_send_remaining -= num_samps_sent; -            size_t nsamps_sent = -                total_num_samps_sent -                + _send_one_packet( -                      buffs, total_num_samps_sent, final_length, metadata, timeout); +            // Loop exit condition: return from `_send_one_packet()` indicates +            // an error +            if (num_samps_sent == 0) { +                break; +            } -            return nsamps_sent; -        } +            // If there are more samples to be sent, thus requiring another +            // trip around the do/while loop, update the timespec in the +            // metadata for the next fragment (if desired) +            if (nsamps_to_send_remaining > 0 and metadata.has_time_spec) { +                metadata.time_spec = +                    metadata.time_spec +                    + time_spec_t::from_ticks(num_samps_sent, _samp_rate); +            } + +            last_eov_position = total_nsamps_sent; + +        } while (nsamps_to_send_remaining > 0); + +        return total_nsamps_sent;      }  protected: @@ -233,10 +365,11 @@ private:          const size_t buffer_offset_in_samps,          const size_t num_samples,          const tx_metadata_t& metadata, +        const bool eov,          const int32_t timeout_ms)      {          if (!_zero_copy_streamer.get_send_buffs( -                _out_buffs, num_samples, metadata, timeout_ms)) { +                _out_buffs, num_samples, metadata, eov, timeout_ms)) {              return 0;          } diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp index 5ac7a1e8c..aa36c9d3b 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -75,12 +75,14 @@ public:       * \param buffs returns a pointer to the buffer data       * \param nsamps_per_buff the number of samples that will be written to each buffer       * \param metadata the metadata to write to the packet header +     * \param eov EOV flag to write to the packet header       * \param timeout_ms timeout in milliseconds       * \return true if the operation was sucessful, false if timeout occurs       */      UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs,          const size_t nsamps_per_buff,          const tx_metadata_t& metadata, +        const bool eov,          const int32_t timeout_ms)      {          // Try to get a buffer per channel @@ -106,6 +108,7 @@ public:          info.payload_bytes = nsamps_per_buff * _bytes_per_item;          info.eob           = metadata.end_of_burst; +        info.eov           = eov;          // Write packet header          for (size_t i = 0; i < buffs.size(); i++) { diff --git a/host/lib/types/types.cpp b/host/lib/types/types.cpp index aa87f30a5..7e947a4f8 100644 --- a/host/lib/types/types.cpp +++ b/host/lib/types/types.cpp @@ -24,11 +24,11 @@ stream_cmd_t::stream_cmd_t(const stream_mode_t &stream_mode):  /***********************************************************************   * metadata   **********************************************************************/ -tx_metadata_t::tx_metadata_t(void): -    has_time_spec(false), -    time_spec(time_spec_t()), -    start_of_burst(false), -    end_of_burst(false) +tx_metadata_t::tx_metadata_t(void) +    : has_time_spec(false) +    , time_spec(time_spec_t()) +    , start_of_burst(false) +    , end_of_burst(false)  {      /* NOP */  } | 
