diff options
Diffstat (limited to 'host/lib/include/uhdlib/transport')
5 files changed, 1177 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp new file mode 100644 index 000000000..662be6d2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp @@ -0,0 +1,175 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP +#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <boost/dynamic_bitset.hpp> +#include <boost/format.hpp> + +namespace uhd { namespace transport { + +// Number of iterations that get_aligned_buffs will attempt to time align +// packets before returning an alignment failure. get_aligned_buffs increments +// the iteration count when it finds a timestamp that is larger than the +// timestamps on channels it has already aligned and thus has to restart +// aligning timestamps on all channels to the new timestamp. +constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000; + +/*! + * Implementation of rx time alignment. This method reads packets from the + * transports for each channel and discards any packets whose tsf does not + * match those of other channels due to dropped packets. Packets that do not + * have a tsf are not checked for alignment and never dropped. + */ +template <typename transport_t> +class get_aligned_buffs +{ +public: + enum alignment_result_t { + SUCCESS, + TIMEOUT, + SEQUENCE_ERROR, + ALIGNMENT_FAILURE, + BAD_PACKET + }; + + get_aligned_buffs(std::vector<typename transport_t::uptr>& xports, + std::vector<typename transport_t::buff_t::uptr>& frame_buffs, + std::vector<typename transport_t::packet_info_t>& infos) + : _xports(xports) + , _frame_buffs(frame_buffs) + , _infos(infos) + , _prev_tsf(_xports.size(), 0) + , _channels_to_align(_xports.size()) + { + } + + alignment_result_t operator()(const int32_t timeout_ms) + { + // Clear state + _channels_to_align.set(); + bool time_valid = false; + uint64_t tsf = 0; + size_t iterations = 0; + + while (_channels_to_align.any()) { + const size_t chan = _channels_to_align.find_first(); + auto& xport = _xports[chan]; + auto& info = _infos[chan]; + auto& frame_buff = _frame_buffs[chan]; + bool seq_error = false; + + // Receive a data packet for the channel if we don't have one. A + // packet may already be there if the previous call was interrupted + // by an error. + if (!frame_buff) { + try { + std::tie(frame_buff, info, seq_error) = + xport->get_recv_buff(timeout_ms); + } catch (const uhd::value_error& e) { + // Bad packet + UHD_LOGGER_ERROR("STREAMER") + << boost::format( + "The receive transport caught a value exception.\n%s") + % e.what(); + return BAD_PACKET; + } + } + + if (!frame_buff) { + return TIMEOUT; + } + + if (info.has_tsf) { + const bool time_out_of_order = _prev_tsf[chan] > info.tsf; + _prev_tsf[chan] = info.tsf; + + // If the user changes the device time while streaming, we can + // receive a packet that comes before the previous packet in + // time. This would cause the alignment logic to discard future + // received packets. Therefore, when this occurs, we reset the + // info to restart the alignment. + if (time_out_of_order) { + time_valid = false; + } + + // Check if the time is larger than packets received for other + // channels, and if so, use this time to align all channels + if (!time_valid || info.tsf > tsf) { + // If we haven't found a set of aligned packets after many + // iterations, return an alignment failure + if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) { + UHD_LOGGER_ERROR("STREAMER") + << "The rx streamer failed to time-align packets."; + return ALIGNMENT_FAILURE; + } + + // Release buffers for channels aligned previously. Keep + // buffers that don't have a tsf since we don't need to + // align those. + for (size_t i = 0; i < _xports.size(); i++) { + if (!_channels_to_align.test(i) && _infos[i].has_tsf) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + _frame_buffs[i] = nullptr; + } + } + + // Mark only this channel as aligned and save its tsf + _channels_to_align.set(); + _channels_to_align.reset(chan); + time_valid = true; + tsf = info.tsf; + } + + // Check if the time matches that of other aligned channels + else if (info.tsf == tsf) { + _channels_to_align.reset(chan); + } + + // Otherwise, time is smaller than other channels, release the buffer + else { + _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan])); + _frame_buffs[chan] = nullptr; + } + } else { + // Packet doesn't have a tsf, just mark it as aligned + _channels_to_align.reset(chan); + } + + // If this packet had a sequence error, stop to return the error. + // Keep the packet for the next call to get_aligned_buffs. + if (seq_error) { + return SEQUENCE_ERROR; + } + } + + // All channels aligned + return SUCCESS; + } + +private: + // Transports for each channel + std::vector<typename transport_t::uptr>& _xports; + + // Storage for buffers resulting from alignment + std::vector<typename transport_t::buff_t::uptr>& _frame_buffs; + + // Packet info corresponding to aligned buffers + std::vector<typename transport_t::packet_info_t>& _infos; + + // Time of previous packet for each channel + std::vector<uint64_t> _prev_tsf; + + // Keeps track of channels that are aligned + boost::dynamic_bitset<> _channels_to_align; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp new file mode 100644 index 000000000..d66e867bc --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp @@ -0,0 +1,341 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/exception.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/endianness.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/rx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for error handling + * + * If a recv call reads data from multiple packets, and an error occurs in the + * second or later packets, recv stops short of the num samps requested and + * returns no error. The error is cached for the next call to recv. + * + * Timeout errors are an exception. Timeouts that occur in the second or later + * packets of a recv call stop the recv method but the error is not returned in + * the next call. The user can check for this condition since fewer samples are + * returned than the number requested. + */ +class rx_metadata_cache +{ +public: + //! Stores metadata in the cache, ignoring timeout errors + UHD_FORCE_INLINE void store(const rx_metadata_t& metadata) + { + if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) { + _metadata_cache = metadata; + _cached_metadata = true; + } + } + + //! Checks for cached metadata + UHD_FORCE_INLINE bool check(rx_metadata_t& metadata) + { + if (_cached_metadata) { + metadata = _metadata_cache; + _cached_metadata = false; + return true; + } + return false; + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::rx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of rx streamer API + */ +template <typename transport_t> +class rx_streamer_impl : public rx_streamer +{ +public: + //! Constructor + rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_ports) + , _in_buffs(num_ports) + { + if (stream_args.cpu_format.empty()) { + throw uhd::value_error("[rx_stream] Must provide a cpu_format!"); + } + if (stream_args.otw_format.empty()) { + throw uhd::value_error("[rx_stream] Must provide a otw_format!"); + } + _setup_converters(num_ports, stream_args); + _zero_copy_streamer.set_samp_rate(_samp_rate); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + //! Implementation of rx_streamer API method + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + //! Implementation of rx_streamer API method + size_t get_max_num_samps() const + { + return _spp; + } + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + //! Implementation of rx_streamer API method + UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t& metadata, + const double timeout, + const bool one_packet) + { + if (_error_metadata_cache.check(metadata)) { + return 0; + } + + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + + size_t total_samps_recv = + _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms); + + if (one_packet or metadata.end_of_burst) { + return total_samps_recv; + } + + // First set of packets recv had an error, return immediately + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + return total_samps_recv; + } + + // Loop until buffer is filled or error code. This method returns the + // metadata from the first packet received, with the exception of + // end-of-burst. + uhd::rx_metadata_t loop_metadata; + + while (total_samps_recv < nsamps_per_buff) { + size_t num_samps = _recv_one_packet(buffs, + nsamps_per_buff - total_samps_recv, + loop_metadata, + timeout_ms, + total_samps_recv * _convert_info.bytes_per_cpu_item); + + // If metadata had an error code set, store for next call and return + if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + _error_metadata_cache.store(loop_metadata); + break; + } + + total_samps_recv += num_samps; + + // Return immediately if end of burst + if (loop_metadata.end_of_burst) { + metadata.end_of_burst = true; + break; + } + } + + return total_samps_recv; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + _zero_copy_streamer.set_samp_rate(rate); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Receive a single packet + UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t& metadata, + const int32_t timeout_ms, + const size_t buffer_offset_bytes = 0) + { + if (_buff_samps_remaining == 0) { + // Current set of buffers has expired, get the next one + _buff_samps_remaining = + _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms); + _fragment_offset_in_samps = 0; + } else { + // There are samples still left in the current set of buffers + metadata = _last_fragment_metadata; + metadata.time_spec += time_spec_t::from_ticks( + _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate); + } + + if (_buff_samps_remaining != 0) { + const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining); + + // Convert samples to the streamer's output format + for (size_t i = 0; i < get_num_channels(); i++) { + char* b = reinterpret_cast<char*>(buffs[i]); + const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes); + _convert_to_out_buff(out_buffs, i, num_samps); + } + + _buff_samps_remaining -= num_samps; + + // Write the fragment flags and offset + metadata.more_fragments = _buff_samps_remaining != 0; + metadata.fragment_offset = _fragment_offset_in_samps; + + if (metadata.more_fragments) { + _fragment_offset_in_samps += num_samps; + _last_fragment_metadata = metadata; + } + + return num_samps; + } else { + return 0; + } + } + + //! Convert samples for one channel into its buffer + UHD_FORCE_INLINE void _convert_to_out_buff( + const uhd::rx_streamer::buffs_type& out_buffs, + const size_t chan, + const size_t num_samps) + { + const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]); + + _converters[chan]->conv(buffer_ptr, out_buffs, num_samps); + + // Advance the pointer for the source buffer + _in_buffs[chan] = + buffer_ptr + num_samps * _convert_info.bytes_per_otw_item; + + if (_buff_samps_remaining == num_samps) { + _zero_copy_streamer.release_recv_buff(chan); + } + } + + //! Create converters and initialize _convert_info + void _setup_converters(const size_t num_ports, + const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.otw_format + "_chdr"; + id.num_inputs = 1; + id.output_format = stream_args.cpu_format; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_ports; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(1 / 32767.0); + } + } + + // Converter and item sizes + convert_info _convert_info; + + // Converters + std::vector<uhd::convert::converter::sptr> _converters; + + // Implementation of frame buffer management and packet info + rx_streamer_zero_copy<transport_t> _zero_copy_streamer; + + // Container for buffer pointers used in recv method + std::vector<const void*> _in_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits<std::size_t>::max(); + + // Num samps remaining in buffer currently held by zero copy streamer + size_t _buff_samps_remaining = 0; + + // Metadata cache for error handling + detail::rx_metadata_cache _error_metadata_cache; + + // Fragment (partially read packet) information + size_t _fragment_offset_in_samps = 0; + rx_metadata_t _last_fragment_metadata; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp new file mode 100644 index 000000000..36f568f2d --- /dev/null +++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp @@ -0,0 +1,207 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/get_aligned_buffs.hpp> +#include <boost/format.hpp> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of rx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template <typename transport_t> +class rx_streamer_zero_copy +{ +public: + //! Constructor + rx_streamer_zero_copy(const size_t num_ports) + : _xports(num_ports) + , _frame_buffs(num_ports) + , _infos(num_ports) + , _get_aligned_buffs(_xports, _frame_buffs, _infos) + { + } + + ~rx_streamer_zero_copy() + { + for (size_t i = 0; i < _frame_buffs.size(); i++) { + if (_frame_buffs[i]) { + _xports[i]->release_recv_buff(std::move(_frame_buffs[i])); + } + } + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t port, typename transport_t::uptr xport) + { + if (port >= get_num_channels()) { + throw uhd::index_error( + "Port number indexes beyond the number of streamer ports"); + } + + if (_xports[port]) { + throw uhd::runtime_error( + "Streamer port number is already connected to a port"); + } + + _xports[port] = std::move(xport); + } + + //! Returns number of channels handled by this streamer + size_t get_num_channels() const + { + return _xports.size(); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _tick_rate = rate; + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures the size of each sample + void set_bytes_per_item(const size_t bpi) + { + _bytes_per_item = bpi; + } + + /*! + * Gets a set of time-aligned buffers, one per channel. + * + * \param buffs returns a pointer to the buffer data + * \param metadata returns the metadata corresponding to the buffer + * \param timeout_ms timeout in milliseconds + * \return the size in samples of each packet, or 0 if timeout + */ + size_t get_recv_buffs(std::vector<const void*>& buffs, + rx_metadata_t& metadata, + const int32_t timeout_ms) + { + metadata.reset(); + + switch (_get_aligned_buffs(timeout_ms)) { + case get_aligned_buffs_t::SUCCESS: + break; + + case get_aligned_buffs_t::BAD_PACKET: + metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + return 0; + + case get_aligned_buffs_t::TIMEOUT: + metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + return 0; + + case get_aligned_buffs_t::ALIGNMENT_FAILURE: + metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + return 0; + + case get_aligned_buffs_t::SEQUENCE_ERROR: + metadata.has_time_spec = _last_read_time_info.has_time_spec; + metadata.time_spec = + _last_read_time_info.time_spec + + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate); + metadata.out_of_sequence = true; + metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + return 0; + + default: + UHD_THROW_INVALID_CODE_PATH(); + } + + // Get payload pointers for each buffer and aggregate eob. We set eob to + // true if any channel has it set, since no more data will be received for + // that channel. In most cases, all channels should have the same value. + bool eob = false; + for (size_t i = 0; i < buffs.size(); i++) { + buffs[i] = _infos[i].payload; + eob |= _infos[i].eob; + } + + // Set the metadata from the buffer information at index zero + const auto& info_0 = _infos[0]; + + metadata.has_time_spec = info_0.has_tsf; + metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate); + metadata.start_of_burst = false; + metadata.end_of_burst = eob; + metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + + // Done with these packets, save timestamp info for next call + _last_read_time_info.has_time_spec = metadata.has_time_spec; + _last_read_time_info.time_spec = metadata.time_spec; + _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item; + + return _last_read_time_info.num_samps; + } + + /*! + * Release the packet for the specified channel + * + * \param channel the channel for which to release the packet + */ + void release_recv_buff(const size_t channel) + { + _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel])); + _frame_buffs[channel] = typename transport_t::buff_t::uptr(); + } + +private: + using get_aligned_buffs_t = get_aligned_buffs<transport_t>; + + // Information recorded by streamer about the last data packet processed, + // used to create the metadata when there is a sequence error. + struct last_read_time_info_t + { + size_t num_samps = 0; + bool has_time_spec = false; + time_spec_t time_spec; + }; + + // Transports for each channel + std::vector<typename transport_t::uptr> _xports; + + // Storage for buffers for each channel while they are in flight (between + // calls to get_recv_buff and release_recv_buff). + std::vector<typename transport_t::buff_t::uptr> _frame_buffs; + + // Packet info corresponding to the packets in flight + std::vector<typename transport_t::packet_info_t> _infos; + + // Rate used in conversion of timestamp to time_spec_t + double _tick_rate = 1.0; + + // Rate used in conversion of timestamp to time_spec_t + double _samp_rate = 1.0; + + // Size of a sample on the device + size_t _bytes_per_item = 0; + + // Implementation of packet time alignment + get_aligned_buffs_t _get_aligned_buffs; + + // Information about the last data packet processed + last_read_time_info_t _last_read_time_info; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp new file mode 100644 index 000000000..60881dad2 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -0,0 +1,307 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP + +#include <uhd/config.hpp> +#include <uhd/convert.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhdlib/transport/tx_streamer_zero_copy.hpp> +#include <limits> +#include <vector> + +namespace uhd { namespace transport { + +namespace detail { + +/*! + * Cache of metadata for send calls with zero samples + * + * Metadata is cached when we get a send requesting a start of burst with no + * samples. It is applied here on the next call to send() that actually has + * samples to send. + */ +class tx_metadata_cache +{ +public: + //! Stores metadata in the cache + UHD_FORCE_INLINE void store(const tx_metadata_t& metadata) + { + _metadata_cache = metadata; + _cached_metadata = true; + } + + //! Checks for cached metadata + UHD_FORCE_INLINE void check(tx_metadata_t& metadata) + { + if (_cached_metadata) { + // Only use cached time_spec if metadata does not have one + if (!metadata.has_time_spec) { + metadata.has_time_spec = _metadata_cache.has_time_spec; + metadata.time_spec = _metadata_cache.time_spec; + } + metadata.start_of_burst = _metadata_cache.start_of_burst; + metadata.end_of_burst = _metadata_cache.end_of_burst; + _cached_metadata = false; + } + } + +private: + // Whether there is a cached metadata object + bool _cached_metadata = false; + + // Cached metadata value + uhd::tx_metadata_t _metadata_cache; +}; + +} // namespace detail + +/*! + * Implementation of tx streamer API + */ +template <typename transport_t> +class tx_streamer_impl : public tx_streamer +{ +public: + tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args) + : _zero_copy_streamer(num_chans) + , _zero_buffs(num_chans, &_zero) + , _out_buffs(num_chans) + { + _setup_converters(num_chans, stream_args); + _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item); + _spp = stream_args.args.cast<size_t>("spp", _spp); + } + + void connect_channel(const size_t channel, typename transport_t::uptr xport) + { + const size_t max_pyld_size = xport->get_max_payload_size(); + _zero_copy_streamer.connect_channel(channel, std::move(xport)); + + // Set spp based on the transport frame size + const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item; + _spp = std::min(_spp, xport_spp); + } + + size_t get_num_channels() const + { + return _zero_copy_streamer.get_num_channels(); + } + + size_t get_max_num_samps() const + { + return _spp; + } + + + /*! Get width of each over-the-wire item component. For complex items, + * returns the width of one component only (real or imaginary). + */ + size_t get_otw_item_comp_bit_width() const + { + return _convert_info.otw_item_bit_width; + } + + size_t send(const uhd::tx_streamer::buffs_type& buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t& metadata_, + const double timeout) + { + uhd::tx_metadata_t metadata(metadata_); + + if (nsamps_per_buff == 0 && metadata.start_of_burst) { + _metadata_cache.store(metadata); + return 0; + } + + _metadata_cache.check(metadata); + + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + + if (nsamps_per_buff == 0) { + // Send requests with no samples are handled here, such as end of + // burst. Send packets need to have at least one sample based on the + // chdr specification, so we use _zero_buffs here. + _send_one_packet(_zero_buffs.data(), + 0, // buffer offset + 1, // num samples + metadata, + timeout_ms); + + return 0; + } else if (nsamps_per_buff <= _spp) { + return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms); + + } else { + size_t total_num_samps_sent = 0; + const bool eob = metadata.end_of_burst; + metadata.end_of_burst = false; + + const size_t num_fragments = (nsamps_per_buff - 1) / _spp; + const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1; + + for (size_t i = 0; i < num_fragments; i++) { + const size_t num_samps_sent = _send_one_packet( + buffs, total_num_samps_sent, _spp, metadata, timeout_ms); + + total_num_samps_sent += num_samps_sent; + + if (num_samps_sent == 0) { + return total_num_samps_sent; + } + + // Setup timespec for the next fragment + if (metadata.has_time_spec) { + metadata.time_spec = + metadata.time_spec + + time_spec_t::from_ticks(num_samps_sent, _samp_rate); + } + + metadata.start_of_burst = false; + } + + // Send the final fragment + metadata.end_of_burst = eob; + + size_t nsamps_sent = + total_num_samps_sent + + _send_one_packet( + buffs, total_num_samps_sent, final_length, metadata, timeout); + + return nsamps_sent; + } + } + + //! Implementation of rx_streamer API method + bool recv_async_msg( + uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) + { + // TODO: implement me + return false; + } + +protected: + //! Configures scaling factor for conversion + void set_scale_factor(const size_t chan, const double scale_factor) + { + _converters[chan]->set_scalar(scale_factor); + } + + //! Configures sample rate for conversion of timestamp + void set_samp_rate(const double rate) + { + _samp_rate = rate; + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _zero_copy_streamer.set_tick_rate(rate); + } + +private: + //! Converter and associated item sizes + struct convert_info + { + size_t bytes_per_otw_item; + size_t bytes_per_cpu_item; + size_t otw_item_bit_width; + }; + + //! Convert samples for one channel and sends a packet + size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs, + const size_t buffer_offset_in_samps, + const size_t num_samples, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + if (!_zero_copy_streamer.get_send_buffs( + _out_buffs, num_samples, metadata, timeout_ms)) { + return 0; + } + + size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item; + + for (size_t i = 0; i < get_num_channels(); i++) { + const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset; + _converters[i]->conv(input_ptr, _out_buffs[i], num_samples); + + _zero_copy_streamer.release_send_buff(i); + } + + return num_samples; + } + + //! Create converters and initialize _bytes_per_cpu_item + void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args) + { + // Note to code archaeologists: In the past, we had to also specify the + // endianness here, but that is no longer necessary because we can make + // the wire endianness match the host endianness. + convert::id_type id; + id.input_format = stream_args.cpu_format; + id.num_inputs = 1; + id.output_format = stream_args.otw_format + "_chdr"; + id.num_outputs = 1; + + auto starts_with = [](const std::string& s, const std::string v) { + return s.find(v) == 0; + }; + + const bool otw_is_complex = starts_with(stream_args.otw_format, "fc") + || starts_with(stream_args.otw_format, "sc"); + + convert_info info; + info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format); + info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format); + + if (otw_is_complex) { + info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2; + } else { + info.otw_item_bit_width = info.bytes_per_otw_item * 8; + } + + _convert_info = info; + + for (size_t i = 0; i < num_chans; i++) { + _converters.push_back(convert::get_converter(id)()); + _converters.back()->set_scalar(32767.0); + } + } + + // Converter item sizes + convert_info _convert_info; + + // Converters + std::vector<uhd::convert::converter::sptr> _converters; + + // Manages frame buffers and packet info + tx_streamer_zero_copy<transport_t> _zero_copy_streamer; + + // Buffer used to handle send calls with no data + std::vector<const void*> _zero_buffs; + + const uint64_t _zero = 0; + + // Container for buffer pointers used in send method + std::vector<void*> _out_buffs; + + // Sample rate used to calculate metadata time_spec_t + double _samp_rate = 1.0; + + // Maximum number of samples per packet + size_t _spp = std::numeric_limits<std::size_t>::max(); + + // Metadata cache for send calls with no data + detail::tx_metadata_cache _metadata_cache; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp new file mode 100644 index 000000000..1b6f55238 --- /dev/null +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -0,0 +1,147 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP +#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP + +#include <uhd/config.hpp> +#include <uhd/stream.hpp> +#include <uhd/types/metadata.hpp> +#include <vector> + +namespace uhd { namespace transport { + +/*! + * Implementation of rx streamer manipulation of frame buffers and packet info. + * This class is part of tx_streamer_impl, split into a separate unit as it is + * a mostly self-contained portion of the streamer logic. + */ +template <typename transport_t> +class tx_streamer_zero_copy +{ +public: + //! Constructor + tx_streamer_zero_copy(const size_t num_chans) + : _xports(num_chans), _frame_buffs(num_chans) + { + } + + //! Connect a new channel to the streamer + void connect_channel(const size_t port, typename transport_t::uptr xport) + { + if (port >= get_num_channels()) { + throw uhd::index_error( + "Port number indexes beyond the number of streamer ports"); + } + + if (_xports[port]) { + throw uhd::runtime_error( + "Streamer port number is already connected to a port"); + } + + _xports[port] = std::move(xport); + } + + //! Returns number of channels handled by this streamer + size_t get_num_channels() const + { + return _xports.size(); + } + + //! Configures tick rate for conversion of timestamp + void set_tick_rate(const double rate) + { + _tick_rate = rate; + } + + //! Configures the size of each sample + void set_bytes_per_item(const size_t bpi) + { + _bytes_per_item = bpi; + } + + /*! + * Gets a set of frame buffers, one per channel. + * + * \param buffs returns a pointer to the buffer data + * \param nsamps_per_buff the number of samples that will be written to each buffer + * \param metadata the metadata to write to the packet header + * \param timeout_ms timeout in milliseconds + * \return true if the operation was sucessful, false if timeout occurs + */ + UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs, + const size_t nsamps_per_buff, + const tx_metadata_t& metadata, + const int32_t timeout_ms) + { + // Try to get a buffer per channel + for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) { + _frame_buffs[_next_buff_to_get].first = + _xports[_next_buff_to_get]->get_send_buff(timeout_ms); + + if (!_frame_buffs[_next_buff_to_get].first) { + return false; + } + } + + // Got all the buffers, start from index 0 next call + _next_buff_to_get = 0; + + // Store portions of metadata we care about + typename transport_t::packet_info_t info; + info.has_tsf = metadata.has_time_spec; + + if (metadata.has_time_spec) { + info.tsf = metadata.time_spec.to_ticks(_tick_rate); + } + + info.payload_bytes = nsamps_per_buff * _bytes_per_item; + info.eob = metadata.end_of_burst; + + // Write packet header + for (size_t i = 0; i < buffs.size(); i++) { + std::tie(buffs[i], _frame_buffs[i].second) = + _xports[i]->write_packet_header(_frame_buffs[i].first, info); + } + + return true; + } + + /*! + * Send the packet for the specified channel + * + * \param channel the channel for which to release the packet + */ + UHD_FORCE_INLINE void release_send_buff(const size_t channel) + { + _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second); + _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first)); + + _frame_buffs[channel].first = nullptr; + _frame_buffs[channel].second = 0; + } + +private: + // Transports for each channel + std::vector<typename transport_t::uptr> _xports; + + // Container to hold frame buffers for each channel and their packet sizes + std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs; + + // Rate used in conversion of timestamp to time_spec_t + double _tick_rate = 1.0; + + // Size of a sample on the device + size_t _bytes_per_item = 0; + + // Next channel from which to get a buffer, stored as a member to + // allow the streamer to continue where it stopped due to timeouts. + size_t _next_buff_to_get = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */ |