aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/include/uhdlib/transport')
-rw-r--r--host/lib/include/uhdlib/transport/get_aligned_buffs.hpp175
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_impl.hpp341
-rw-r--r--host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp207
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_impl.hpp307
-rw-r--r--host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp147
5 files changed, 1177 insertions, 0 deletions
diff --git a/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
new file mode 100644
index 000000000..662be6d2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/get_aligned_buffs.hpp
@@ -0,0 +1,175 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+#define INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/dynamic_bitset.hpp>
+#include <boost/format.hpp>
+
+namespace uhd { namespace transport {
+
+// Number of iterations that get_aligned_buffs will attempt to time align
+// packets before returning an alignment failure. get_aligned_buffs increments
+// the iteration count when it finds a timestamp that is larger than the
+// timestamps on channels it has already aligned and thus has to restart
+// aligning timestamps on all channels to the new timestamp.
+constexpr size_t ALIGNMENT_FAILURE_THRESHOLD = 1000;
+
+/*!
+ * Implementation of rx time alignment. This method reads packets from the
+ * transports for each channel and discards any packets whose tsf does not
+ * match those of other channels due to dropped packets. Packets that do not
+ * have a tsf are not checked for alignment and never dropped.
+ */
+template <typename transport_t>
+class get_aligned_buffs
+{
+public:
+ enum alignment_result_t {
+ SUCCESS,
+ TIMEOUT,
+ SEQUENCE_ERROR,
+ ALIGNMENT_FAILURE,
+ BAD_PACKET
+ };
+
+ get_aligned_buffs(std::vector<typename transport_t::uptr>& xports,
+ std::vector<typename transport_t::buff_t::uptr>& frame_buffs,
+ std::vector<typename transport_t::packet_info_t>& infos)
+ : _xports(xports)
+ , _frame_buffs(frame_buffs)
+ , _infos(infos)
+ , _prev_tsf(_xports.size(), 0)
+ , _channels_to_align(_xports.size())
+ {
+ }
+
+ alignment_result_t operator()(const int32_t timeout_ms)
+ {
+ // Clear state
+ _channels_to_align.set();
+ bool time_valid = false;
+ uint64_t tsf = 0;
+ size_t iterations = 0;
+
+ while (_channels_to_align.any()) {
+ const size_t chan = _channels_to_align.find_first();
+ auto& xport = _xports[chan];
+ auto& info = _infos[chan];
+ auto& frame_buff = _frame_buffs[chan];
+ bool seq_error = false;
+
+ // Receive a data packet for the channel if we don't have one. A
+ // packet may already be there if the previous call was interrupted
+ // by an error.
+ if (!frame_buff) {
+ try {
+ std::tie(frame_buff, info, seq_error) =
+ xport->get_recv_buff(timeout_ms);
+ } catch (const uhd::value_error& e) {
+ // Bad packet
+ UHD_LOGGER_ERROR("STREAMER")
+ << boost::format(
+ "The receive transport caught a value exception.\n%s")
+ % e.what();
+ return BAD_PACKET;
+ }
+ }
+
+ if (!frame_buff) {
+ return TIMEOUT;
+ }
+
+ if (info.has_tsf) {
+ const bool time_out_of_order = _prev_tsf[chan] > info.tsf;
+ _prev_tsf[chan] = info.tsf;
+
+ // If the user changes the device time while streaming, we can
+ // receive a packet that comes before the previous packet in
+ // time. This would cause the alignment logic to discard future
+ // received packets. Therefore, when this occurs, we reset the
+ // info to restart the alignment.
+ if (time_out_of_order) {
+ time_valid = false;
+ }
+
+ // Check if the time is larger than packets received for other
+ // channels, and if so, use this time to align all channels
+ if (!time_valid || info.tsf > tsf) {
+ // If we haven't found a set of aligned packets after many
+ // iterations, return an alignment failure
+ if (iterations++ > ALIGNMENT_FAILURE_THRESHOLD) {
+ UHD_LOGGER_ERROR("STREAMER")
+ << "The rx streamer failed to time-align packets.";
+ return ALIGNMENT_FAILURE;
+ }
+
+ // Release buffers for channels aligned previously. Keep
+ // buffers that don't have a tsf since we don't need to
+ // align those.
+ for (size_t i = 0; i < _xports.size(); i++) {
+ if (!_channels_to_align.test(i) && _infos[i].has_tsf) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ _frame_buffs[i] = nullptr;
+ }
+ }
+
+ // Mark only this channel as aligned and save its tsf
+ _channels_to_align.set();
+ _channels_to_align.reset(chan);
+ time_valid = true;
+ tsf = info.tsf;
+ }
+
+ // Check if the time matches that of other aligned channels
+ else if (info.tsf == tsf) {
+ _channels_to_align.reset(chan);
+ }
+
+ // Otherwise, time is smaller than other channels, release the buffer
+ else {
+ _xports[chan]->release_recv_buff(std::move(_frame_buffs[chan]));
+ _frame_buffs[chan] = nullptr;
+ }
+ } else {
+ // Packet doesn't have a tsf, just mark it as aligned
+ _channels_to_align.reset(chan);
+ }
+
+ // If this packet had a sequence error, stop to return the error.
+ // Keep the packet for the next call to get_aligned_buffs.
+ if (seq_error) {
+ return SEQUENCE_ERROR;
+ }
+ }
+
+ // All channels aligned
+ return SUCCESS;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr>& _xports;
+
+ // Storage for buffers resulting from alignment
+ std::vector<typename transport_t::buff_t::uptr>& _frame_buffs;
+
+ // Packet info corresponding to aligned buffers
+ std::vector<typename transport_t::packet_info_t>& _infos;
+
+ // Time of previous packet for each channel
+ std::vector<uint64_t> _prev_tsf;
+
+ // Keeps track of channels that are aligned
+ boost::dynamic_bitset<> _channels_to_align;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_GET_ALIGNED_BUFFS_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
new file mode 100644
index 000000000..d66e867bc
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_impl.hpp
@@ -0,0 +1,341 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/endianness.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/rx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for error handling
+ *
+ * If a recv call reads data from multiple packets, and an error occurs in the
+ * second or later packets, recv stops short of the num samps requested and
+ * returns no error. The error is cached for the next call to recv.
+ *
+ * Timeout errors are an exception. Timeouts that occur in the second or later
+ * packets of a recv call stop the recv method but the error is not returned in
+ * the next call. The user can check for this condition since fewer samples are
+ * returned than the number requested.
+ */
+class rx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache, ignoring timeout errors
+ UHD_FORCE_INLINE void store(const rx_metadata_t& metadata)
+ {
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE bool check(rx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ metadata = _metadata_cache;
+ _cached_metadata = false;
+ return true;
+ }
+ return false;
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::rx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of rx streamer API
+ */
+template <typename transport_t>
+class rx_streamer_impl : public rx_streamer
+{
+public:
+ //! Constructor
+ rx_streamer_impl(const size_t num_ports, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_ports)
+ , _in_buffs(num_ports)
+ {
+ if (stream_args.cpu_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a cpu_format!");
+ }
+ if (stream_args.otw_format.empty()) {
+ throw uhd::value_error("[rx_stream] Must provide a otw_format!");
+ }
+ _setup_converters(num_ports, stream_args);
+ _zero_copy_streamer.set_samp_rate(_samp_rate);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ //! Implementation of rx_streamer API method
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ //! Implementation of rx_streamer API method
+ UHD_INLINE size_t recv(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const double timeout,
+ const bool one_packet)
+ {
+ if (_error_metadata_cache.check(metadata)) {
+ return 0;
+ }
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ size_t total_samps_recv =
+ _recv_one_packet(buffs, nsamps_per_buff, metadata, timeout_ms);
+
+ if (one_packet or metadata.end_of_burst) {
+ return total_samps_recv;
+ }
+
+ // First set of packets recv had an error, return immediately
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ return total_samps_recv;
+ }
+
+ // Loop until buffer is filled or error code. This method returns the
+ // metadata from the first packet received, with the exception of
+ // end-of-burst.
+ uhd::rx_metadata_t loop_metadata;
+
+ while (total_samps_recv < nsamps_per_buff) {
+ size_t num_samps = _recv_one_packet(buffs,
+ nsamps_per_buff - total_samps_recv,
+ loop_metadata,
+ timeout_ms,
+ total_samps_recv * _convert_info.bytes_per_cpu_item);
+
+ // If metadata had an error code set, store for next call and return
+ if (loop_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) {
+ _error_metadata_cache.store(loop_metadata);
+ break;
+ }
+
+ total_samps_recv += num_samps;
+
+ // Return immediately if end of burst
+ if (loop_metadata.end_of_burst) {
+ metadata.end_of_burst = true;
+ break;
+ }
+ }
+
+ return total_samps_recv;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ _zero_copy_streamer.set_samp_rate(rate);
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Receive a single packet
+ UHD_FORCE_INLINE size_t _recv_one_packet(const uhd::rx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t& metadata,
+ const int32_t timeout_ms,
+ const size_t buffer_offset_bytes = 0)
+ {
+ if (_buff_samps_remaining == 0) {
+ // Current set of buffers has expired, get the next one
+ _buff_samps_remaining =
+ _zero_copy_streamer.get_recv_buffs(_in_buffs, metadata, timeout_ms);
+ _fragment_offset_in_samps = 0;
+ } else {
+ // There are samples still left in the current set of buffers
+ metadata = _last_fragment_metadata;
+ metadata.time_spec += time_spec_t::from_ticks(
+ _fragment_offset_in_samps - metadata.fragment_offset, _samp_rate);
+ }
+
+ if (_buff_samps_remaining != 0) {
+ const size_t num_samps = std::min(nsamps_per_buff, _buff_samps_remaining);
+
+ // Convert samples to the streamer's output format
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ char* b = reinterpret_cast<char*>(buffs[i]);
+ const uhd::rx_streamer::buffs_type out_buffs(b + buffer_offset_bytes);
+ _convert_to_out_buff(out_buffs, i, num_samps);
+ }
+
+ _buff_samps_remaining -= num_samps;
+
+ // Write the fragment flags and offset
+ metadata.more_fragments = _buff_samps_remaining != 0;
+ metadata.fragment_offset = _fragment_offset_in_samps;
+
+ if (metadata.more_fragments) {
+ _fragment_offset_in_samps += num_samps;
+ _last_fragment_metadata = metadata;
+ }
+
+ return num_samps;
+ } else {
+ return 0;
+ }
+ }
+
+ //! Convert samples for one channel into its buffer
+ UHD_FORCE_INLINE void _convert_to_out_buff(
+ const uhd::rx_streamer::buffs_type& out_buffs,
+ const size_t chan,
+ const size_t num_samps)
+ {
+ const char* buffer_ptr = reinterpret_cast<const char*>(_in_buffs[chan]);
+
+ _converters[chan]->conv(buffer_ptr, out_buffs, num_samps);
+
+ // Advance the pointer for the source buffer
+ _in_buffs[chan] =
+ buffer_ptr + num_samps * _convert_info.bytes_per_otw_item;
+
+ if (_buff_samps_remaining == num_samps) {
+ _zero_copy_streamer.release_recv_buff(chan);
+ }
+ }
+
+ //! Create converters and initialize _convert_info
+ void _setup_converters(const size_t num_ports,
+ const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.otw_format + "_chdr";
+ id.num_inputs = 1;
+ id.output_format = stream_args.cpu_format;
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.input_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.output_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_ports; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(1 / 32767.0);
+ }
+ }
+
+ // Converter and item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Implementation of frame buffer management and packet info
+ rx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Container for buffer pointers used in recv method
+ std::vector<const void*> _in_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Num samps remaining in buffer currently held by zero copy streamer
+ size_t _buff_samps_remaining = 0;
+
+ // Metadata cache for error handling
+ detail::rx_metadata_cache _error_metadata_cache;
+
+ // Fragment (partially read packet) information
+ size_t _fragment_offset_in_samps = 0;
+ rx_metadata_t _last_fragment_metadata;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..36f568f2d
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/rx_streamer_zero_copy.hpp
@@ -0,0 +1,207 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/get_aligned_buffs.hpp>
+#include <boost/format.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of rx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class rx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ rx_streamer_zero_copy(const size_t num_ports)
+ : _xports(num_ports)
+ , _frame_buffs(num_ports)
+ , _infos(num_ports)
+ , _get_aligned_buffs(_xports, _frame_buffs, _infos)
+ {
+ }
+
+ ~rx_streamer_zero_copy()
+ {
+ for (size_t i = 0; i < _frame_buffs.size(); i++) {
+ if (_frame_buffs[i]) {
+ _xports[i]->release_recv_buff(std::move(_frame_buffs[i]));
+ }
+ }
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of time-aligned buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param metadata returns the metadata corresponding to the buffer
+ * \param timeout_ms timeout in milliseconds
+ * \return the size in samples of each packet, or 0 if timeout
+ */
+ size_t get_recv_buffs(std::vector<const void*>& buffs,
+ rx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ metadata.reset();
+
+ switch (_get_aligned_buffs(timeout_ms)) {
+ case get_aligned_buffs_t::SUCCESS:
+ break;
+
+ case get_aligned_buffs_t::BAD_PACKET:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ return 0;
+
+ case get_aligned_buffs_t::TIMEOUT:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ return 0;
+
+ case get_aligned_buffs_t::ALIGNMENT_FAILURE:
+ metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ return 0;
+
+ case get_aligned_buffs_t::SEQUENCE_ERROR:
+ metadata.has_time_spec = _last_read_time_info.has_time_spec;
+ metadata.time_spec =
+ _last_read_time_info.time_spec
+ + time_spec_t::from_ticks(_last_read_time_info.num_samps, _samp_rate);
+ metadata.out_of_sequence = true;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ return 0;
+
+ default:
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+
+ // Get payload pointers for each buffer and aggregate eob. We set eob to
+ // true if any channel has it set, since no more data will be received for
+ // that channel. In most cases, all channels should have the same value.
+ bool eob = false;
+ for (size_t i = 0; i < buffs.size(); i++) {
+ buffs[i] = _infos[i].payload;
+ eob |= _infos[i].eob;
+ }
+
+ // Set the metadata from the buffer information at index zero
+ const auto& info_0 = _infos[0];
+
+ metadata.has_time_spec = info_0.has_tsf;
+ metadata.time_spec = time_spec_t::from_ticks(info_0.tsf, _tick_rate);
+ metadata.start_of_burst = false;
+ metadata.end_of_burst = eob;
+ metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+
+ // Done with these packets, save timestamp info for next call
+ _last_read_time_info.has_time_spec = metadata.has_time_spec;
+ _last_read_time_info.time_spec = metadata.time_spec;
+ _last_read_time_info.num_samps = info_0.payload_bytes / _bytes_per_item;
+
+ return _last_read_time_info.num_samps;
+ }
+
+ /*!
+ * Release the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ void release_recv_buff(const size_t channel)
+ {
+ _xports[channel]->release_recv_buff(std::move(_frame_buffs[channel]));
+ _frame_buffs[channel] = typename transport_t::buff_t::uptr();
+ }
+
+private:
+ using get_aligned_buffs_t = get_aligned_buffs<transport_t>;
+
+ // Information recorded by streamer about the last data packet processed,
+ // used to create the metadata when there is a sequence error.
+ struct last_read_time_info_t
+ {
+ size_t num_samps = 0;
+ bool has_time_spec = false;
+ time_spec_t time_spec;
+ };
+
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Storage for buffers for each channel while they are in flight (between
+ // calls to get_recv_buff and release_recv_buff).
+ std::vector<typename transport_t::buff_t::uptr> _frame_buffs;
+
+ // Packet info corresponding to the packets in flight
+ std::vector<typename transport_t::packet_info_t> _infos;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _samp_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Implementation of packet time alignment
+ get_aligned_buffs_t _get_aligned_buffs;
+
+ // Information about the last data packet processed
+ last_read_time_info_t _last_read_time_info;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_RX_STREAMER_ZERO_COPY_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
new file mode 100644
index 000000000..60881dad2
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp
@@ -0,0 +1,307 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_IMPL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhdlib/transport/tx_streamer_zero_copy.hpp>
+#include <limits>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+namespace detail {
+
+/*!
+ * Cache of metadata for send calls with zero samples
+ *
+ * Metadata is cached when we get a send requesting a start of burst with no
+ * samples. It is applied here on the next call to send() that actually has
+ * samples to send.
+ */
+class tx_metadata_cache
+{
+public:
+ //! Stores metadata in the cache
+ UHD_FORCE_INLINE void store(const tx_metadata_t& metadata)
+ {
+ _metadata_cache = metadata;
+ _cached_metadata = true;
+ }
+
+ //! Checks for cached metadata
+ UHD_FORCE_INLINE void check(tx_metadata_t& metadata)
+ {
+ if (_cached_metadata) {
+ // Only use cached time_spec if metadata does not have one
+ if (!metadata.has_time_spec) {
+ metadata.has_time_spec = _metadata_cache.has_time_spec;
+ metadata.time_spec = _metadata_cache.time_spec;
+ }
+ metadata.start_of_burst = _metadata_cache.start_of_burst;
+ metadata.end_of_burst = _metadata_cache.end_of_burst;
+ _cached_metadata = false;
+ }
+ }
+
+private:
+ // Whether there is a cached metadata object
+ bool _cached_metadata = false;
+
+ // Cached metadata value
+ uhd::tx_metadata_t _metadata_cache;
+};
+
+} // namespace detail
+
+/*!
+ * Implementation of tx streamer API
+ */
+template <typename transport_t>
+class tx_streamer_impl : public tx_streamer
+{
+public:
+ tx_streamer_impl(const size_t num_chans, const uhd::stream_args_t stream_args)
+ : _zero_copy_streamer(num_chans)
+ , _zero_buffs(num_chans, &_zero)
+ , _out_buffs(num_chans)
+ {
+ _setup_converters(num_chans, stream_args);
+ _zero_copy_streamer.set_bytes_per_item(_convert_info.bytes_per_otw_item);
+ _spp = stream_args.args.cast<size_t>("spp", _spp);
+ }
+
+ void connect_channel(const size_t channel, typename transport_t::uptr xport)
+ {
+ const size_t max_pyld_size = xport->get_max_payload_size();
+ _zero_copy_streamer.connect_channel(channel, std::move(xport));
+
+ // Set spp based on the transport frame size
+ const size_t xport_spp = max_pyld_size / _convert_info.bytes_per_otw_item;
+ _spp = std::min(_spp, xport_spp);
+ }
+
+ size_t get_num_channels() const
+ {
+ return _zero_copy_streamer.get_num_channels();
+ }
+
+ size_t get_max_num_samps() const
+ {
+ return _spp;
+ }
+
+
+ /*! Get width of each over-the-wire item component. For complex items,
+ * returns the width of one component only (real or imaginary).
+ */
+ size_t get_otw_item_comp_bit_width() const
+ {
+ return _convert_info.otw_item_bit_width;
+ }
+
+ size_t send(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t& metadata_,
+ const double timeout)
+ {
+ uhd::tx_metadata_t metadata(metadata_);
+
+ if (nsamps_per_buff == 0 && metadata.start_of_burst) {
+ _metadata_cache.store(metadata);
+ return 0;
+ }
+
+ _metadata_cache.check(metadata);
+
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ if (nsamps_per_buff == 0) {
+ // Send requests with no samples are handled here, such as end of
+ // burst. Send packets need to have at least one sample based on the
+ // chdr specification, so we use _zero_buffs here.
+ _send_one_packet(_zero_buffs.data(),
+ 0, // buffer offset
+ 1, // num samples
+ metadata,
+ timeout_ms);
+
+ return 0;
+ } else if (nsamps_per_buff <= _spp) {
+ return _send_one_packet(buffs, 0, nsamps_per_buff, metadata, timeout_ms);
+
+ } else {
+ size_t total_num_samps_sent = 0;
+ const bool eob = metadata.end_of_burst;
+ metadata.end_of_burst = false;
+
+ const size_t num_fragments = (nsamps_per_buff - 1) / _spp;
+ const size_t final_length = ((nsamps_per_buff - 1) % _spp) + 1;
+
+ for (size_t i = 0; i < num_fragments; i++) {
+ const size_t num_samps_sent = _send_one_packet(
+ buffs, total_num_samps_sent, _spp, metadata, timeout_ms);
+
+ total_num_samps_sent += num_samps_sent;
+
+ if (num_samps_sent == 0) {
+ return total_num_samps_sent;
+ }
+
+ // Setup timespec for the next fragment
+ if (metadata.has_time_spec) {
+ metadata.time_spec =
+ metadata.time_spec
+ + time_spec_t::from_ticks(num_samps_sent, _samp_rate);
+ }
+
+ metadata.start_of_burst = false;
+ }
+
+ // Send the final fragment
+ metadata.end_of_burst = eob;
+
+ size_t nsamps_sent =
+ total_num_samps_sent
+ + _send_one_packet(
+ buffs, total_num_samps_sent, final_length, metadata, timeout);
+
+ return nsamps_sent;
+ }
+ }
+
+ //! Implementation of rx_streamer API method
+ bool recv_async_msg(
+ uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/)
+ {
+ // TODO: implement me
+ return false;
+ }
+
+protected:
+ //! Configures scaling factor for conversion
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ _converters[chan]->set_scalar(scale_factor);
+ }
+
+ //! Configures sample rate for conversion of timestamp
+ void set_samp_rate(const double rate)
+ {
+ _samp_rate = rate;
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _zero_copy_streamer.set_tick_rate(rate);
+ }
+
+private:
+ //! Converter and associated item sizes
+ struct convert_info
+ {
+ size_t bytes_per_otw_item;
+ size_t bytes_per_cpu_item;
+ size_t otw_item_bit_width;
+ };
+
+ //! Convert samples for one channel and sends a packet
+ size_t _send_one_packet(const uhd::tx_streamer::buffs_type& buffs,
+ const size_t buffer_offset_in_samps,
+ const size_t num_samples,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ if (!_zero_copy_streamer.get_send_buffs(
+ _out_buffs, num_samples, metadata, timeout_ms)) {
+ return 0;
+ }
+
+ size_t byte_offset = buffer_offset_in_samps * _convert_info.bytes_per_cpu_item;
+
+ for (size_t i = 0; i < get_num_channels(); i++) {
+ const void* input_ptr = static_cast<const uint8_t*>(buffs[i]) + byte_offset;
+ _converters[i]->conv(input_ptr, _out_buffs[i], num_samples);
+
+ _zero_copy_streamer.release_send_buff(i);
+ }
+
+ return num_samples;
+ }
+
+ //! Create converters and initialize _bytes_per_cpu_item
+ void _setup_converters(const size_t num_chans, const uhd::stream_args_t stream_args)
+ {
+ // Note to code archaeologists: In the past, we had to also specify the
+ // endianness here, but that is no longer necessary because we can make
+ // the wire endianness match the host endianness.
+ convert::id_type id;
+ id.input_format = stream_args.cpu_format;
+ id.num_inputs = 1;
+ id.output_format = stream_args.otw_format + "_chdr";
+ id.num_outputs = 1;
+
+ auto starts_with = [](const std::string& s, const std::string v) {
+ return s.find(v) == 0;
+ };
+
+ const bool otw_is_complex = starts_with(stream_args.otw_format, "fc")
+ || starts_with(stream_args.otw_format, "sc");
+
+ convert_info info;
+ info.bytes_per_otw_item = convert::get_bytes_per_item(id.output_format);
+ info.bytes_per_cpu_item = convert::get_bytes_per_item(id.input_format);
+
+ if (otw_is_complex) {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8 / 2;
+ } else {
+ info.otw_item_bit_width = info.bytes_per_otw_item * 8;
+ }
+
+ _convert_info = info;
+
+ for (size_t i = 0; i < num_chans; i++) {
+ _converters.push_back(convert::get_converter(id)());
+ _converters.back()->set_scalar(32767.0);
+ }
+ }
+
+ // Converter item sizes
+ convert_info _convert_info;
+
+ // Converters
+ std::vector<uhd::convert::converter::sptr> _converters;
+
+ // Manages frame buffers and packet info
+ tx_streamer_zero_copy<transport_t> _zero_copy_streamer;
+
+ // Buffer used to handle send calls with no data
+ std::vector<const void*> _zero_buffs;
+
+ const uint64_t _zero = 0;
+
+ // Container for buffer pointers used in send method
+ std::vector<void*> _out_buffs;
+
+ // Sample rate used to calculate metadata time_spec_t
+ double _samp_rate = 1.0;
+
+ // Maximum number of samples per packet
+ size_t _spp = std::numeric_limits<std::size_t>::max();
+
+ // Metadata cache for send calls with no data
+ detail::tx_metadata_cache _metadata_cache;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_TX_STREAMER_IMPL_HPP */
diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
new file mode 100644
index 000000000..1b6f55238
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp
@@ -0,0 +1,147 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+#define INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/types/metadata.hpp>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Implementation of rx streamer manipulation of frame buffers and packet info.
+ * This class is part of tx_streamer_impl, split into a separate unit as it is
+ * a mostly self-contained portion of the streamer logic.
+ */
+template <typename transport_t>
+class tx_streamer_zero_copy
+{
+public:
+ //! Constructor
+ tx_streamer_zero_copy(const size_t num_chans)
+ : _xports(num_chans), _frame_buffs(num_chans)
+ {
+ }
+
+ //! Connect a new channel to the streamer
+ void connect_channel(const size_t port, typename transport_t::uptr xport)
+ {
+ if (port >= get_num_channels()) {
+ throw uhd::index_error(
+ "Port number indexes beyond the number of streamer ports");
+ }
+
+ if (_xports[port]) {
+ throw uhd::runtime_error(
+ "Streamer port number is already connected to a port");
+ }
+
+ _xports[port] = std::move(xport);
+ }
+
+ //! Returns number of channels handled by this streamer
+ size_t get_num_channels() const
+ {
+ return _xports.size();
+ }
+
+ //! Configures tick rate for conversion of timestamp
+ void set_tick_rate(const double rate)
+ {
+ _tick_rate = rate;
+ }
+
+ //! Configures the size of each sample
+ void set_bytes_per_item(const size_t bpi)
+ {
+ _bytes_per_item = bpi;
+ }
+
+ /*!
+ * Gets a set of frame buffers, one per channel.
+ *
+ * \param buffs returns a pointer to the buffer data
+ * \param nsamps_per_buff the number of samples that will be written to each buffer
+ * \param metadata the metadata to write to the packet header
+ * \param timeout_ms timeout in milliseconds
+ * \return true if the operation was sucessful, false if timeout occurs
+ */
+ UHD_FORCE_INLINE bool get_send_buffs(std::vector<void*>& buffs,
+ const size_t nsamps_per_buff,
+ const tx_metadata_t& metadata,
+ const int32_t timeout_ms)
+ {
+ // Try to get a buffer per channel
+ for (; _next_buff_to_get < _xports.size(); _next_buff_to_get++) {
+ _frame_buffs[_next_buff_to_get].first =
+ _xports[_next_buff_to_get]->get_send_buff(timeout_ms);
+
+ if (!_frame_buffs[_next_buff_to_get].first) {
+ return false;
+ }
+ }
+
+ // Got all the buffers, start from index 0 next call
+ _next_buff_to_get = 0;
+
+ // Store portions of metadata we care about
+ typename transport_t::packet_info_t info;
+ info.has_tsf = metadata.has_time_spec;
+
+ if (metadata.has_time_spec) {
+ info.tsf = metadata.time_spec.to_ticks(_tick_rate);
+ }
+
+ info.payload_bytes = nsamps_per_buff * _bytes_per_item;
+ info.eob = metadata.end_of_burst;
+
+ // Write packet header
+ for (size_t i = 0; i < buffs.size(); i++) {
+ std::tie(buffs[i], _frame_buffs[i].second) =
+ _xports[i]->write_packet_header(_frame_buffs[i].first, info);
+ }
+
+ return true;
+ }
+
+ /*!
+ * Send the packet for the specified channel
+ *
+ * \param channel the channel for which to release the packet
+ */
+ UHD_FORCE_INLINE void release_send_buff(const size_t channel)
+ {
+ _frame_buffs[channel].first->set_packet_size(_frame_buffs[channel].second);
+ _xports[channel]->release_send_buff(std::move(_frame_buffs[channel].first));
+
+ _frame_buffs[channel].first = nullptr;
+ _frame_buffs[channel].second = 0;
+ }
+
+private:
+ // Transports for each channel
+ std::vector<typename transport_t::uptr> _xports;
+
+ // Container to hold frame buffers for each channel and their packet sizes
+ std::vector<std::pair<typename transport_t::buff_t::uptr, size_t>> _frame_buffs;
+
+ // Rate used in conversion of timestamp to time_spec_t
+ double _tick_rate = 1.0;
+
+ // Size of a sample on the device
+ size_t _bytes_per_item = 0;
+
+ // Next channel from which to get a buffer, stored as a member to
+ // allow the streamer to continue where it stopped due to timeouts.
+ size_t _next_buff_to_get = 0;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TX_STREAMER_ZERO_COPY_HPP */