From 75a090543b8fb8e7c875387eee6d3fe7227e4450 Mon Sep 17 00:00:00 2001 From: Ciro Nishiguchi Date: Thu, 23 May 2019 20:38:07 -0500 Subject: rfnoc: add rx and tx transports, and amend rfnoc_graph transports: Transports build on I/O service and implements flow control and sequence number checking. The rx streamer subclass extends the streamer implementation to connect it to the rfnoc graph. It receives configuration values from property propagation and configures the streamer accordingly. It also implements the issue_stream_cmd rx_streamer API method. Add implementation of rx streamer creation and method to connect it to an rfnoc block. rfnoc_graph: Cache more connection info, clarify contract Summary of changes: - rfnoc_graph stores more information about static connections at the beginning. Some search algorithms are replaced by simpler lookups. - The contract for connect() was clarified. It is required to call connect, even for static connections. --- .../include/uhdlib/transport/get_aligned_buffs.hpp | 175 +++++++++++ .../include/uhdlib/transport/rx_streamer_impl.hpp | 341 +++++++++++++++++++++ .../uhdlib/transport/rx_streamer_zero_copy.hpp | 207 +++++++++++++ .../include/uhdlib/transport/tx_streamer_impl.hpp | 307 +++++++++++++++++++ .../uhdlib/transport/tx_streamer_zero_copy.hpp | 147 +++++++++ 5 files changed, 1177 insertions(+) create mode 100644 host/lib/include/uhdlib/transport/get_aligned_buffs.hpp create mode 100644 host/lib/include/uhdlib/transport/rx_streamer_impl.hpp create mode 100644 host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp create mode 100644 host/lib/include/uhdlib/transport/tx_streamer_impl.hpp create mode 100644 host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp (limited to 'host/lib/include/uhdlib/transport') diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp new file mode 100644 index 000000000..662be6d2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp @@ -0,0 +1,175 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP +#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP + +#include +#include +#include +#include + +namespace uhd { namespace transport { + +// Number of iterations that get_aligned_buffs will attempt to time align +// packets before returning an alignment failure. get_aligned_buffs increments +// the iteration count when it finds a timestamp that is larger than the +// timestamps on channels it has already aligned and thus has to restart +// aligning timestamps on all channels to the new timestamp. +constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000; + +/*! + * Implementation of rx time alignment. This method reads packets from the + * transports for each channel and discards any packets whose tsf does not + * match those of other channels due to dropped packets. Packets that do not + * have a tsf are not checked for alignment and never dropped. + */ +template +class get_aligned_buffs +{ +public: + enum alignment_result_t { + SUCCESS, + TIMEOUT, + SEQUENCE_ERROR, + ALIGNMENT_FAILURE, + BAD_PACKET + }; + + get_aligned_buffs(std::vector& xports, + std::vector& frame_buffs, + std::vector& infos) + : _xports(xports) + , _frame_buffs(frame_buffs) + , _infos(infos) + , _prev_tsf(_xports.size(), 0) + , _channels_to_align(_xports.size()) + { + } + + alignment_result_t operator()(const int32_t timeout_ms) + { + // Clear state + _channels_to_align.set(); + bool time_valid = false; + uint64_t tsf = 0; + size_t iterations = 0; + + while (_channels_to_align.any()) { + const size_t chan = _channels_to_align.find_first(); + auto& xport = _xports[chan]; + auto& info = _infos[chan]; + auto& frame_buff = _frame_buffs[chan]; + bool seq_error = false; + + // Receive a data packet for the channel if we don't have one. A + // packet may already be there if the previous call was interrupted + // by an error. + if (!frame_buff) { + try { + std::tie(frame_buff, info, seq_error) = + xport->get_recv_buff(timeout_ms); + } catch (const uhd::value_error& e) { + // Bad packet + UHD_LOGGER_ERROR("STREAMER") + << boost::format( + "The receive transport caught a value exception.\n%s") + % e.what(); + return BAD_PACKET; + } + } + + if (!frame_buff) { + return TIMEOUT; + } + + if (info.has_tsf) { + const bool time_out_of_order = _prev_tsf[chan] > info.tsf; + _prev_tsf[chan] = info.tsf; + + // If the user changes the device time while streaming, we can + // receive a packet that comes before the previous packet in + // time. This would cause the alignment logic to discard future + // received packets. Therefore, when this occurs, we reset the + // info to restart the alignment. + if (time_out_of_order) { + time_valid = false; + } + + // Check if the time is larger than packets received for other + // channels, and if so, use this time to align all channels + if (!time_valid || info.tsf > tsf) { + // If we haven't found a set of aligned packets after many + // iterations, return an alignment failure + if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) { + UHD_LOGGER_ERROR("STREAMER") + << "The rx streamer failed to time-align packets."; + return ALIGNMENT_FAILURE; + } + + // Release buffers for channels aligned previously. Keep + // buffers that don't have a tsf since we don't need to + // align those. + for (size_t i = 0; i < _xports.size(); i++) { + if (!_channels_to_align.test(i) && _infos[i].has_tsf) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + _frame_buffs[i] = nullptr; + } + } + + // Mark only this channel as aligned and save its tsf + _channels_to_align.set(); + _channels_to_align.reset(chan); + time_valid = true; + tsf = info.tsf; + } + + // Check if the time matches that of other aligned channels + else if (info.tsf == tsf) { + _channels_to_align.reset(chan); + } + + // Otherwise, time is smaller than other channels, release the buffer + else { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + } else { + // Packet doesn't have a tsf, just mark it as aligned + _channels_to_align.reset(chan); + } + + // If this packet had a sequence error, stop to return the error. + // Keep the packet for the next call to get_aligned_buffs. + if (seq_error) { + return SEQUENCE_ERROR; + } + } + + // All channels aligned + return SUCCESS; + } + +private: + // Transports for each channel + std::vector& _xports; + + // Storage for buffers resulting from alignment + std::vector& _frame_buffs; + + // Packet info corresponding to aligned buffers + std::vector& _infos; + + // Time of previous packet for each channel + std::vector _prev_tsf; + + // Keeps track of channels that are aligned + boost::dynamic_bitset<> _channels_to_align; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp new file mode 100644 index 000000000..d66e867bc --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -0,0 +1,341 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for error handling + * + * If a recv call reads data from multiple packets, and an error occurs in the + * second or later packets, recv stops short of the num samps requested and + * returns no error. The error is cached for the next call to recv. + * + * Timeout errors are an exception. Timeouts that occur in the second or later + * packets of a recv call stop the recv method but the error is not returned in + * the next call. The user can check for this condition since fewer samples are + * returned than the number requested. + */ +class rx_metadata_cache +{ +public: + //! Stores metadata in the cache, ignoring timeout errors + UHD_FORCE_INLINE void store(const rx_metadata_t& metadata) + { + if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) { + _metadata_cache = metadata; + _cached_metadata = true; + } + } + + //! Checks for cached metadata + UHD_FORCE_INLINE bool check(rx_metadata_t& metadata) + { + if (_cached_metadata) { + metadata = _metadata_cache; + _cached_metadata = false; + return true; + } + return false; + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::rx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of rx streamer API + */ +template +class rx_streamer_impl : public rx_streamer +{ +public: + //! Constructor + rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_ports) + , _in_buffs(num_ports) + { + if (stream_args.cpu_format.empty()) { + throw uhd::value_error("[rx_stream] Must provide a cpu_format!"); + } + if (stream_args.otw_format.empty()) { + throw uhd::value_error("[rx_stream] Must provide a otw_format!"); + } + _setup_converters(num_ports, stream_args); + _zero_copy_streamer.set_samp_rate(_samp_rate); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + //! Implementation of rx_streamer API method + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + //! Implementation of rx_streamer API method + size_t get_max_num_samps() const + { + return _spp; + } + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + //! Implementation of rx_streamer API method + UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t& metadata, + const double timeout, + const bool one_packet) + { + if (_error_metadata_cache.check(metadata)) { + return 0; + } + + const int32_t timeout_ms = static_cast(timeout * 1000); + + size_t total_samps_recv = + _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms); + + if (one_packet or metadata.end_of_burst) { + return total_samps_recv; + } + + // First set of packets recv had an error, return immediately + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + return total_samps_recv; + } + + // Loop until buffer is filled or error code. This method returns the + // metadata from the first packet received, with the exception of + // end-of-burst. + uhd::rx_metadata_t loop_metadata; + + while (total_samps_recv < nsamps_per_buff) { + size_t num_samps = _recv_one_packet(buffs, + nsamps_per_buff - total_samps_recv, + loop_metadata, + timeout_ms, + total_samps_recv * _convert_info.bytes_per_cpu_item); + + // If metadata had an error code set, store for next call and return + if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + _error_metadata_cache.store(loop_metadata); + break; + } + + total_samps_recv += num_samps; + + // Return immediately if end of burst + if (loop_metadata.end_of_burst) { + metadata.end_of_burst = true; + break; + } + } + + return total_samps_recv; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + _zero_copy_streamer.set_samp_rate(rate); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Receive a single packet + UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t& metadata, + const int32_t timeout_ms, + const size_t buffer_offset_bytes = 0) + { + if (_buff_samps_remaining == 0) { + // Current set of buffers has expired, get the next one + _buff_samps_remaining = + _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms); + _fragment_offset_in_samps = 0; + } else { + // There are samples still left in the current set of buffers + metadata = _last_fragment_metadata; + metadata.time_spec += time_spec_t::from_ticks( + _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate); + } + + if (_buff_samps_remaining != 0) { + const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining); + + // Convert samples to the streamer's output format + for (size_t i = 0; i < get_num_channels(); i++) { + char* b = reinterpret_cast(buffs[i]); + const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes); + _convert_to_out_buff(out_buffs, i, num_samps); + } + + _buff_samps_remaining -= num_samps; + + // Write the fragment flags and offset + metadata.more_fragments = _buff_samps_remaining != 0; + metadata.fragment_offset = _fragment_offset_in_samps; + + if (metadata.more_fragments) { + _fragment_offset_in_samps += num_samps; + _last_fragment_metadata = metadata; + } + + return num_samps; + } else { + return 0; + } + } + + //! Convert samples for one channel into its buffer + UHD_FORCE_INLINE void _convert_to_out_buff( + const uhd::rx_streamer::buffs_type& out_buffs, + const size_t chan, + const size_t num_samps) + { + const char* buffer_ptr = reinterpret_cast(_in_buffs[chan]); + + _converters[chan]->conv(buffer_ptr, out_buffs, num_samps); + + // Advance the pointer for the source buffer + _in_buffs[chan] = + buffer_ptr + num_samps * _convert_info.bytes_per_otw_item; + + if (_buff_samps_remaining == num_samps) { + _zero_copy_streamer.release_recv_buff(chan); + } + } + + //! Create converters and initialize _convert_info + void _setup_converters(const size_t num_ports, + const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.otw_format + "_chdr"; + id.num_inputs = 1; + id.output_format = stream_args.cpu_format; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_ports; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(1 / 32767.0); + } + } + + // Converter and item sizes + convert_info _convert_info; + + // Converters + std::vector _converters; + + // Implementation of frame buffer management and packet info + rx_streamer_zero_copy _zero_copy_streamer; + + // Container for buffer pointers used in recv method + std::vector _in_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits::max(); + + // Num samps remaining in buffer currently held by zero copy streamer + size_t _buff_samps_remaining = 0; + + // Metadata cache for error handling + detail::rx_metadata_cache _error_metadata_cache; + + // Fragment (partially read packet) information + size_t _fragment_offset_in_samps = 0; + rx_metadata_t _last_fragment_metadata; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp new file mode 100644 index 000000000..36f568f2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -0,0 +1,207 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of rx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template +class rx_streamer_zero_copy +{ +public: + //! Constructor + rx_streamer_zero_copy(const size_t num_ports) + : _xports(num_ports) + , _frame_buffs(num_ports) + , _infos(num_ports) + , _get_aligned_buffs(_xports, _frame_buffs, _infos) + { + } + + ~rx_streamer_zero_copy() + { + for (size_t i = 0; i < _frame_buffs.size(); i++) { + if (_frame_buffs[i]) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + } + } + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t port, typename transport_t::uptr xport) + { + if (port >= get_num_channels()) { + throw uhd::index_error( + "Port number indexes beyond the number of streamer ports"); + } + + if (_xports[port]) { + throw uhd::runtime_error( + "Streamer port number is already connected to a port"); + } + + _xports[port] = std::move(xport); + } + + //! Returns number of channels handled by this streamer + size_t get_num_channels() const + { + return _xports.size(); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _tick_rate = rate; + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures the size of each sample + void set_bytes_per_item(const size_t bpi) + { + _bytes_per_item = bpi; + } + + /*! + * Gets a set of time-aligned buffers, one per channel. + * + * \param buffs returns a pointer to the buffer data + * \param metadata returns the metadata corresponding to the buffer + * \param timeout_ms timeout in milliseconds + * \return the size in samples of each packet, or 0 if timeout + */ + size_t get_recv_buffs(std::vector& buffs, + rx_metadata_t& metadata, + const int32_t timeout_ms) + { + metadata.reset(); + + switch (_get_aligned_buffs(timeout_ms)) { + case get_aligned_buffs_t::SUCCESS: + break; + + case get_aligned_buffs_t::BAD_PACKET: + metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + return 0; + + case get_aligned_buffs_t::TIMEOUT: + metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + return 0; + + case get_aligned_buffs_t::ALIGNMENT_FAILURE: + metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + return 0; + + case get_aligned_buffs_t::SEQUENCE_ERROR: + metadata.has_time_spec = _last_read_time_info.has_time_spec; + metadata.time_spec = + _last_read_time_info.time_spec + + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); + metadata.out_of_sequence = true; + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + return 0; + + default: + UHD_THROW_INVALID_CODE_PATH(); + } + + // Get payload pointers for each buffer and aggregate eob. We set eob to + // true if any channel has it set, since no more data will be received for + // that channel. In most cases, all channels should have the same value. + bool eob = false; + for (size_t i = 0; i < buffs.size(); i++) { + buffs[i] = _infos[i].payload; + eob |= _infos[i].eob; + } + + // Set the metadata from the buffer information at index zero + const auto& info_0 = _infos[0]; + + metadata.has_time_spec = info_0.has_tsf; + metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate); + metadata.start_of_burst = false; + metadata.end_of_burst = eob; + metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + + // Done with these packets, save timestamp info for next call + _last_read_time_info.has_time_spec = metadata.has_time_spec; + _last_read_time_info.time_spec = metadata.time_spec; + _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item; + + return _last_read_time_info.num_samps; + } + + /*! + * Release the packet for the specified channel + * + * \param channel the channel for which to release the packet + */ + void release_recv_buff(const size_t channel) + { + _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel])); + _frame_buffs[channel] = typename transport_t::buff_t::uptr(); + } + +private: + using get_aligned_buffs_t = get_aligned_buffs; + + // Information recorded by streamer about the last data packet processed, + // used to create the metadata when there is a sequence error. + struct last_read_time_info_t + { + size_t num_samps = 0; + bool has_time_spec = false; + time_spec_t time_spec; + }; + + // Transports for each channel + std::vector _xports; + + // Storage for buffers for each channel while they are in flight (between + // calls to get_recv_buff and release_recv_buff). + std::vector _frame_buffs; + + // Packet info corresponding to the packets in flight + std::vector _infos; + + // Rate used in conversion of timestamp to time_spec_t + double _tick_rate = 1.0; + + // Rate used in conversion of timestamp to time_spec_t + double _samp_rate = 1.0; + + // Size of a sample on the device + size_t _bytes_per_item = 0; + + // Implementation of packet time alignment + get_aligned_buffs_t _get_aligned_buffs; + + // Information about the last data packet processed + last_read_time_info_t _last_read_time_info; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp new file mode 100644 index 000000000..60881dad2 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -0,0 +1,307 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for send calls with zero samples + * + * Metadata is cached when we get a send requesting a start of burst with no + * samples. It is applied here on the next call to send() that actually has + * samples to send. + */ +class tx_metadata_cache +{ +public: + //! Stores metadata in the cache + UHD_FORCE_INLINE void store(const tx_metadata_t& metadata) + { + _metadata_cache = metadata; + _cached_metadata = true; + } + + //! Checks for cached metadata + UHD_FORCE_INLINE void check(tx_metadata_t& metadata) + { + if (_cached_metadata) { + // Only use cached time_spec if metadata does not have one + if (!metadata.has_time_spec) { + metadata.has_time_spec = _metadata_cache.has_time_spec; + metadata.time_spec = _metadata_cache.time_spec; + } + metadata.start_of_burst = _metadata_cache.start_of_burst; + metadata.end_of_burst = _metadata_cache.end_of_burst; + _cached_metadata = false; + } + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::tx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of tx streamer API + */ +template +class tx_streamer_impl : public tx_streamer +{ +public: + tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_chans) + , _zero_buffs(num_chans, &_zero) + , _out_buffs(num_chans) + { + _setup_converters(num_chans, stream_args); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + _spp = stream_args.args.cast("spp", _spp); + } + + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + size_t get_max_num_samps() const + { + return _spp; + } + + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + size_t send(const uhd::tx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t& metadata_, + const double timeout) + { + uhd::tx_metadata_t metadata(metadata_); + + if (nsamps_per_buff == 0 && metadata.start_of_burst) { + _metadata_cache.store(metadata); + return 0; + } + + _metadata_cache.check(metadata); + + const int32_t timeout_ms = static_cast(timeout * 1000); + + if (nsamps_per_buff == 0) { + // Send requests with no samples are handled here, such as end of + // burst. Send packets need to have at least one sample based on the + // chdr specification, so we use _zero_buffs here. + _send_one_packet(_zero_buffs.data(), + 0, // buffer offset + 1, // num samples + metadata, + timeout_ms); + + return 0; + } else if (nsamps_per_buff <= _spp) { + return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + + } else { + size_t total_num_samps_sent = 0; + const bool eob = metadata.end_of_burst; + metadata.end_of_burst = false; + + const size_t num_fragments = (nsamps_per_buff - 1) / _spp; + const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1; + + for (size_t i = 0; i < num_fragments; i++) { + const size_t num_samps_sent = _send_one_packet( + buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + + total_num_samps_sent += num_samps_sent; + + if (num_samps_sent == 0) { + return total_num_samps_sent; + } + + // Setup timespec for the next fragment + if (metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + metadata.start_of_burst = false; + } + + // Send the final fragment + metadata.end_of_burst = eob; + + size_t nsamps_sent = + total_num_samps_sent + + _send_one_packet( + buffs, total_num_samps_sent, final_length, metadata, timeout); + + return nsamps_sent; + } + } + + //! Implementation of rx_streamer API method + bool recv_async_msg( + uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) + { + // TODO: implement me + return false; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Convert samples for one channel and sends a packet + size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs, + const size_t buffer_offset_in_samps, + const size_t num_samples, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + if (!_zero_copy_streamer.get_send_buffs( + _out_buffs, num_samples, metadata, timeout_ms)) { + return 0; + } + + size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item; + + for (size_t i = 0; i < get_num_channels(); i++) { + const void* input_ptr = static_cast(buffs[i]) + byte_offset; + _converters[i]->conv(input_ptr, _out_buffs[i], num_samples); + + _zero_copy_streamer.release_send_buff(i); + } + + return num_samples; + } + + //! Create converters and initialize _bytes_per_cpu_item + void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.cpu_format; + id.num_inputs = 1; + id.output_format = stream_args.otw_format + "_chdr"; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_chans; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(32767.0); + } + } + + // Converter item sizes + convert_info _convert_info; + + // Converters + std::vector _converters; + + // Manages frame buffers and packet info + tx_streamer_zero_copy _zero_copy_streamer; + + // Buffer used to handle send calls with no data + std::vector _zero_buffs; + + const uint64_t _zero = 0; + + // Container for buffer pointers used in send method + std::vector _out_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits::max(); + + // Metadata cache for send calls with no data + detail::tx_metadata_cache _metadata_cache; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp new file mode 100644 index 000000000..1b6f55238 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -0,0 +1,147 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP + +#include +#include +#include +#include + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of tx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template +class tx_streamer_zero_copy +{ +public: + //! Constructor + tx_streamer_zero_copy(const size_t num_chans) + : _xports(num_chans), _frame_buffs(num_chans) + { + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t port, typename transport_t::uptr xport) + { + if (port >= get_num_channels()) { + throw uhd::index_error( + "Port number indexes beyond the number of streamer ports"); + } + + if (_xports[port]) { + throw uhd::runtime_error( + "Streamer port number is already connected to a port"); + } + + _xports[port] = std::move(xport); + } + + //! Returns number of channels handled by this streamer + size_t get_num_channels() const + { + return _xports.size(); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _tick_rate = rate; + } + + //! Configures the size of each sample + void set_bytes_per_item(const size_t bpi) + { + _bytes_per_item = bpi; + } + + /*! + * Gets a set of frame buffers, one per channel. + * + * \param buffs returns a pointer to the buffer data + * \param nsamps_per_buff the number of samples that will be written to each buffer + * \param metadata the metadata to write to the packet header + * \param timeout_ms timeout in milliseconds + * \return true if the operation was sucessful, false if timeout occurs + */ + UHD_FORCE_INLINE bool get_send_buffs(std::vector& buffs, + const size_t nsamps_per_buff, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + // Try to get a buffer per channel + for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) { + _frame_buffs[_next_buff_to_get].first = + _xports[_next_buff_to_get]->get_send_buff(timeout_ms); + + if (!_frame_buffs[_next_buff_to_get].first) { + return false; + } + } + + // Got all the buffers, start from index 0 next call + _next_buff_to_get = 0; + + // Store portions of metadata we care about + typename transport_t::packet_info_t info; + info.has_tsf = metadata.has_time_spec; + + if (metadata.has_time_spec) { + info.tsf = metadata.time_spec.to_ticks(_tick_rate); + } + + info.payload_bytes = nsamps_per_buff * _bytes_per_item; + info.eob = metadata.end_of_burst; + + // Write packet header + for (size_t i = 0; i < buffs.size(); i++) { + std::tie(buffs[i], _frame_buffs[i].second) = + _xports[i]->write_packet_header(_frame_buffs[i].first, info); + } + + return true; + } + + /*! + * Send the packet for the specified channel + * + * \param channel the channel for which to release the packet + */ + UHD_FORCE_INLINE void release_send_buff(const size_t channel) + { + _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second); + _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first)); + + _frame_buffs[channel].first = nullptr; + _frame_buffs[channel].second = 0; + } + +private: + // Transports for each channel + std::vector _xports; + + // Container to hold frame buffers for each channel and their packet sizes + std::vector> _frame_buffs; + + // Rate used in conversion of timestamp to time_spec_t + double _tick_rate = 1.0; + + // Size of a sample on the device + size_t _bytes_per_item = 0; + + // Next channel from which to get a buffer, stored as a member to + // allow the streamer to continue where it stopped due to timeouts. + size_t _next_buff_to_get = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */ -- cgit v1.2.3