aboutsummaryrefslogtreecommitdiffstats
path: root/host/tests
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-05-23 20:38:07 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:29 -0800
commit75a090543b8fb8e7c875387eee6d3fe7227e4450 (patch)
tree2904b48607cc07158aa6b068ada35ab56c4da516 /host/tests
parentd8e9705bc6c34b8d015b56a76955ee2f15426bd8 (diff)
downloaduhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.gz
uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.bz2
uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.zip
rfnoc: add rx and tx transports, and amend rfnoc_graph
transports: Transports build on I/O service and implements flow control and sequence number checking. The rx streamer subclass extends the streamer implementation to connect it to the rfnoc graph. It receives configuration values from property propagation and configures the streamer accordingly. It also implements the issue_stream_cmd rx_streamer API method. Add implementation of rx streamer creation and method to connect it to an rfnoc block. rfnoc_graph: Cache more connection info, clarify contract Summary of changes: - rfnoc_graph stores more information about static connections at the beginning. Some search algorithms are replaced by simpler lookups. - The contract for connect() was clarified. It is required to call connect, even for static connections.
Diffstat (limited to 'host/tests')
-rw-r--r--host/tests/CMakeLists.txt2
-rw-r--r--host/tests/common/mock_link.hpp8
-rw-r--r--host/tests/rfnoc_chdr_test.cpp46
-rw-r--r--host/tests/rx_streamer_test.cpp744
-rw-r--r--host/tests/tx_streamer_test.cpp393
5 files changed, 1193 insertions, 0 deletions
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index d6ad6d777..1cdb42b96 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -55,6 +55,8 @@ set(test_sources
fe_conn_test.cpp
rfnoc_node_test.cpp
link_test.cpp
+ rx_streamer_test.cpp
+ tx_streamer_test.cpp
)
set(benchmark_sources
diff --git a/host/tests/common/mock_link.hpp b/host/tests/common/mock_link.hpp
index 34ea15540..73a65916c 100644
--- a/host/tests/common/mock_link.hpp
+++ b/host/tests/common/mock_link.hpp
@@ -94,6 +94,14 @@ public:
}
/*!
+ * Return the number of packets stored in the mock link.
+ */
+ size_t get_num_packets() const
+ {
+ return _tx_mems.size();
+ }
+
+ /*!
* Retrieve the contents of a packet sent by the link. The link
* stores packets in a queue in the order they were sent.
*/
diff --git a/host/tests/rfnoc_chdr_test.cpp b/host/tests/rfnoc_chdr_test.cpp
index 417ed2f96..1c63d5976 100644
--- a/host/tests/rfnoc_chdr_test.cpp
+++ b/host/tests/rfnoc_chdr_test.cpp
@@ -222,3 +222,49 @@ BOOST_AUTO_TEST_CASE(chdr_strc_packet_no_swap_64)
std::cout << pyld.to_string();
}
}
+
+BOOST_AUTO_TEST_CASE(chdr_generic_packet_calculate_pyld_offset_64)
+{
+ // Check calculation without timestamp
+ auto test_pyld_offset = [](chdr_packet::uptr& pkt,
+ const packet_type_t pkt_type,
+ const size_t num_mdata)
+ {
+ uint64_t buff[MAX_BUF_SIZE_WORDS];
+ chdr_header header;
+ header.set_pkt_type(pkt_type);
+ header.set_num_mdata(num_mdata);
+
+ pkt->refresh(reinterpret_cast<void*>(buff), header, 0);
+
+ const size_t pyld_offset = pkt->calculate_payload_offset(
+ pkt_type, num_mdata);
+
+ void* pyld_ptr = pkt->get_payload_ptr();
+
+ const size_t non_pyld_bytes = static_cast<size_t>(
+ reinterpret_cast<uint8_t*>(pyld_ptr) -
+ reinterpret_cast<uint8_t*>(buff));
+
+ BOOST_CHECK(pyld_offset == non_pyld_bytes);
+ };
+
+ {
+ chdr_packet::uptr pkt = chdr64_be_factory.make_generic();
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2);
+ }
+ {
+ chdr_packet::uptr pkt = chdr256_be_factory.make_generic();
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1);
+ test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2);
+ }
+}
diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp
new file mode 100644
index 000000000..cd4daf569
--- /dev/null
+++ b/host/tests/rx_streamer_test.cpp
@@ -0,0 +1,744 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "../common/mock_link.hpp"
+#include <uhdlib/transport/rx_streamer_impl.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/test/unit_test.hpp>
+#include <iostream>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Contents of mock packet header
+ */
+struct mock_header_t
+{
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ bool ignore_seq = true;
+ size_t seq_num = 0;
+};
+
+/*!
+ * Mock rx data xport which doesn't use I/O service, and just interacts with
+ * the link directly.
+ */
+class mock_rx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<mock_rx_data_xport>;
+ using buff_t = uhd::transport::frame_buff;
+
+ //! Values extracted from received RX data packets
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ const void* payload = nullptr;
+ };
+
+ mock_rx_data_xport(mock_recv_link::sptr recv_link) : _recv_link(recv_link) {}
+
+ std::tuple<frame_buff::uptr, packet_info_t, bool> get_recv_buff(
+ const int32_t timeout_ms)
+ {
+ frame_buff::uptr buff = _recv_link->get_recv_buff(timeout_ms);
+ mock_header_t header = *(reinterpret_cast<mock_header_t*>(buff->data()));
+
+ packet_info_t info;
+ info.eob = header.eob;
+ info.has_tsf = header.has_tsf;
+ info.tsf = header.tsf;
+ info.payload_bytes = header.payload_bytes;
+ info.payload = reinterpret_cast<uint8_t*>(buff->data()) + sizeof(mock_header_t);
+
+ const uint8_t* pkt_end =
+ reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size();
+ const size_t pyld_pkt_len =
+ pkt_end - reinterpret_cast<const uint8_t*>(info.payload);
+
+ if (pyld_pkt_len < info.payload_bytes) {
+ _recv_link->release_recv_buff(std::move(buff));
+ throw uhd::value_error("Bad header or invalid packet length.");
+ }
+
+ const bool seq_match = header.seq_num == _seq_num;
+ const bool seq_error = !header.ignore_seq && !seq_match;
+ _seq_num = header.seq_num + 1;
+
+ return std::make_tuple(std::move(buff), info, seq_error);
+ }
+
+ void release_recv_buff(frame_buff::uptr buff)
+ {
+ _recv_link->release_recv_buff(std::move(buff));
+ }
+
+ size_t get_max_payload_size() const
+ {
+ return _recv_link->get_recv_frame_size() - sizeof(packet_info_t);
+ }
+
+private:
+ mock_recv_link::sptr _recv_link;
+ size_t _seq_num = 0;
+};
+
+/*!
+ * Mock rx streamer for testing
+ */
+class mock_rx_streamer : public rx_streamer_impl<mock_rx_data_xport>
+{
+public:
+ mock_rx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args)
+ : rx_streamer_impl(num_chans, stream_args)
+ {
+ }
+
+ void issue_stream_cmd(const stream_cmd_t&) {}
+
+ void set_tick_rate(double rate)
+ {
+ rx_streamer_impl::set_tick_rate(rate);
+ }
+
+ void set_samp_rate(double rate)
+ {
+ rx_streamer_impl::set_samp_rate(rate);
+ }
+
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ rx_streamer_impl::set_scale_factor(chan, scale_factor);
+ }
+};
+
+}} // namespace uhd::transport
+
+using namespace uhd::transport;
+
+using rx_streamer = rx_streamer_impl<mock_rx_data_xport>;
+
+static const double TICK_RATE = 100e6;
+static const double SAMP_RATE = 10e6;
+static const size_t FRAME_SIZE = 1000;
+static const double SCALE_FACTOR = 2;
+
+/*!
+ * Helper functions
+ */
+static std::vector<mock_recv_link::sptr> make_links(const size_t num)
+{
+ const mock_recv_link::link_params params = {FRAME_SIZE, 1};
+
+ std::vector<mock_recv_link::sptr> links;
+
+ for (size_t i = 0; i < num; i++) {
+ links.push_back(std::make_shared<mock_recv_link>(params));
+ }
+
+ return links;
+}
+
+static boost::shared_ptr<mock_rx_streamer> make_rx_streamer(
+ std::vector<mock_recv_link::sptr> recv_links,
+ const std::string& host_format,
+ const std::string& otw_format = "sc16")
+{
+ const uhd::stream_args_t stream_args(host_format, otw_format);
+ auto streamer = boost::make_shared<mock_rx_streamer>(recv_links.size(), stream_args);
+ streamer->set_tick_rate(TICK_RATE);
+ streamer->set_samp_rate(SAMP_RATE);
+
+ for (size_t i = 0; i < recv_links.size(); i++) {
+ mock_rx_data_xport::uptr xport(
+ std::make_unique<mock_rx_data_xport>(recv_links[i]));
+
+ streamer->set_scale_factor(i, SCALE_FACTOR);
+ streamer->connect_channel(i, std::move(xport));
+ }
+
+ return streamer;
+}
+
+static void push_back_recv_packet(mock_recv_link::sptr recv_link,
+ mock_header_t header,
+ size_t num_samps,
+ uint16_t start_data = 0)
+{
+ // Allocate buffer
+ const size_t pyld_bytes = num_samps * sizeof(std::complex<uint16_t>);
+ const size_t buff_len = sizeof(header) + pyld_bytes;
+ boost::shared_array<uint8_t> data(new uint8_t[buff_len]);
+
+ // Write header to buffer
+ header.payload_bytes = pyld_bytes;
+ *(reinterpret_cast<mock_header_t*>(data.get())) = header;
+
+ // Write data to buffer
+ auto data_ptr =
+ reinterpret_cast<std::complex<uint16_t>*>(data.get() + sizeof(header));
+
+ for (size_t i = 0; i < num_samps; i++) {
+ uint16_t val = (start_data + i) * 2;
+ data_ptr[i] = std::complex<uint16_t>(val, val + 1);
+ }
+
+ // Push back buffer for link to recv
+ recv_link->push_back_recv_packet(data, buff_len);
+}
+
+/*!
+ * Tests
+ */
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ const bool even_iteration = (i % 2 == 0);
+ const bool odd_iteration = (i % 2 != 0);
+ mock_header_t header;
+ header.eob = even_iteration;
+ header.has_tsf = odd_iteration;
+ header.tsf = i;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ for (size_t j = 0; j < num_samps; j++) {
+ const auto value =
+ std::complex<float>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[j]);
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet)
+{
+ const size_t NUM_BUFFS_TO_TEST = 5;
+ const std::string format("fc64");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * 4;
+ std::vector<std::complex<double>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.tsf = i;
+
+ size_t samps_written = 0;
+ while (samps_written < num_samps) {
+ size_t samps_to_write = std::min(num_samps - samps_written, spp);
+ push_back_recv_packet(recv_links[0], header, samps_to_write, samps_written);
+ samps_written += samps_to_write;
+ }
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, false);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ for (size_t j = 0; j < num_samps; j++) {
+ const auto value =
+ std::complex<double>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[j]);
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet_with_eob)
+{
+ // EOB should terminate a multi-packet recv, test that it does
+ const std::string format("sc16");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_packets = 4;
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * num_packets;
+ std::vector<std::complex<double>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ // Queue 4 packets, with eob set in every other packet
+ for (size_t i = 0; i < num_packets; i++) {
+ mock_header_t header;
+ header.has_tsf = false;
+ header.eob = (i % 2) != 0;
+ push_back_recv_packet(recv_links[0], header, spp);
+ }
+
+ // Now call recv and check that eob terminates a recv call
+ for (size_t i = 0; i < num_packets / 2; i++) {
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, spp * 2);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, false);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_two_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("sc16");
+
+ const size_t num_chans = 2;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+
+ std::vector<std::vector<std::complex<uint16_t>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ const bool even_iteration = (i % 2 == 0);
+ const bool odd_iteration = (i % 2 != 0);
+ mock_header_t header;
+ header.eob = even_iteration;
+ header.has_tsf = odd_iteration;
+ header.tsf = i;
+
+ size_t samps_pushed = 0;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps, samps_pushed);
+ samps_pushed += num_samps;
+ }
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ size_t samps_checked = 0;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ for (size_t samp = 0; samp < num_samps; samp++) {
+ const size_t n = samps_checked + samp;
+ const auto value = std::complex<uint16_t>((n * 2), (n * 2 + 1));
+ BOOST_CHECK_EQUAL(value, buffer[ch][samp]);
+ }
+ samps_checked += num_samps;
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_packet_fragment)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ // Push back five packets, then read them 1/4 of a packet at a time
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t reads_per_packet = 4;
+ const size_t num_samps = spp / reads_per_packet;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = true;
+ header.has_tsf = true;
+ header.tsf = 0;
+ push_back_recv_packet(recv_links[0], header, num_samps * reads_per_packet);
+ }
+
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t total_samps_read = 0;
+ for (size_t j = 0; j < reads_per_packet; j++) {
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.more_fragments, j != reads_per_packet - 1);
+ BOOST_CHECK_EQUAL(metadata.fragment_offset, total_samps_read);
+
+ const size_t ticks_per_sample = static_cast<size_t>(TICK_RATE / SAMP_RATE);
+ const size_t expected_ticks = ticks_per_sample * total_samps_read;
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), expected_ticks);
+
+ for (size_t samp = 0; samp < num_samps; samp++) {
+ const size_t pkt_idx = samp + total_samps_read;
+ const auto value = std::complex<float>(
+ (pkt_idx * 2) * SCALE_FACTOR, (pkt_idx * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[samp]);
+ }
+
+ total_samps_read += num_samps_ret;
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_seq_error)
+{
+ // Test that when we get a sequence error the error is returned in the
+ // metadata with a time spec that corresponds to the time spec of the
+ // last sample in the previous packet plus one sample clock. Test that
+ // the packet that causes the sequence error is not discarded.
+ const size_t NUM_PKTS_TO_TEST = 2;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+ size_t seq_num = 0;
+ size_t tsf = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.ignore_seq = false;
+
+ // Push back three packets but skip a seq_num after the second
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ tsf += num_samps;
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ seq_num++; // dropped packet
+ tsf += num_samps;
+
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // First two reads should succeed
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ size_t prev_tsf = metadata.time_spec.to_ticks(TICK_RATE);
+ size_t expected_tsf = prev_tsf + num_samps * (TICK_RATE / SAMP_RATE);
+
+ // Third read should be a sequence error
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+ size_t metadata_tsf = metadata.time_spec.to_ticks(TICK_RATE);
+ BOOST_CHECK_EQUAL(metadata_tsf, expected_tsf);
+
+ // Next read should succeed
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, false);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_bad_packet)
+{
+ // Test that when we receive a packet with invalid chdr header or length
+ // the streamer returns the correct error in meatadata.
+ auto push_back_bad_packet = [](mock_recv_link::sptr recv_link) {
+ mock_header_t header;
+ header.payload_bytes = 1000;
+
+ // Allocate a buffer that is too small for the payload
+ const size_t buff_len = 100;
+ boost::shared_array<uint8_t> data(new uint8_t[buff_len]);
+
+ // Write header to buffer
+ *(reinterpret_cast<mock_header_t*>(data.get())) = header;
+
+ // Push back buffer for link to recv
+ recv_link->push_back_recv_packet(data, buff_len);
+ };
+
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ mock_header_t header;
+
+ // Push back a regular packet
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // Push back a bad packet
+ push_back_bad_packet(recv_links[0]);
+
+ // Push back another regular packet
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // First read should succeed
+ size_t num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+
+ // Second read should be an error
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET);
+
+ // Third read should succeed
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_multi_channel_no_tsf)
+{
+ // Test that we can receive packets without tsf. Start by pushing
+ // a packet with a tsf followed by a few packets without.
+ const size_t NUM_PKTS_TO_TEST = 6;
+ const std::string format("fc64");
+
+ const size_t num_chans = 10;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 21;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = (i == NUM_PKTS_TO_TEST - 1);
+ header.has_tsf = (i == 0);
+ header.tsf = 500;
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, i == NUM_PKTS_TO_TEST - 1);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, i == 0);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_multi_channel_seq_error)
+{
+ // Test that the streamer handles dropped packets correctly by injecting
+ // a sequence error in one channel. The streamer should discard
+ // corresponding packets from all other channels.
+ const std::string format("fc64");
+
+ const size_t num_chans = 100;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 99;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.tsf = 0;
+ header.ignore_seq = false;
+ header.seq_num = 0;
+
+ // Drop a packet from an arbitrary channel right at the start
+ if (ch != num_chans / 2) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ // Add a regular packet to check the streamer drops the first
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+
+ // Drop a packet from the first channel
+ header.seq_num++;
+ header.tsf++;
+ if (ch != 0) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ // Add a regular packet
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+
+ // Drop a few packets from the last channel
+ for (size_t j = 0; j < 10; j++) {
+ header.seq_num++;
+ header.tsf++;
+ if (ch != num_chans - 1) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+ }
+
+ // Add a regular packet
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ // First recv should result in error
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 1 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 1);
+
+ // Next recv should result in error
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 3 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 3);
+
+ // Next recv should result in error
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 14 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 14);
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_alignment_error)
+{
+ // Test that the alignment procedure returns an alignment error if it can't
+ // time align packets.
+ const std::string format("fc64");
+
+ const size_t num_chans = 4;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 2;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ mock_header_t header;
+ header.eob = true;
+ header.has_tsf = true;
+ header.tsf = 500;
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+
+ for (size_t pkt = 0; pkt < uhd::transport::ALIGNMENT_FAILURE_THRESHOLD; pkt++) {
+ header.tsf = header.tsf + num_samps;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ if (ch == num_chans - 1) {
+ // Misalign this time stamp
+ header.tsf += 1;
+ }
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+ }
+
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT);
+}
diff --git a/host/tests/tx_streamer_test.cpp b/host/tests/tx_streamer_test.cpp
new file mode 100644
index 000000000..cb07cffad
--- /dev/null
+++ b/host/tests/tx_streamer_test.cpp
@@ -0,0 +1,393 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "../common/mock_link.hpp"
+#include <uhdlib/transport/tx_streamer_impl.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/test/unit_test.hpp>
+#include <iostream>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Mock tx data xport which doesn't use I/O service, and just interacts with
+ * the link directly. Transport copies packet info directly into the frame
+ * buffer.
+ */
+class mock_tx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<mock_tx_data_xport>;
+ using buff_t = uhd::transport::frame_buff;
+
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ };
+
+ mock_tx_data_xport(mock_send_link::sptr send_link) : _send_link(send_link) {}
+
+ buff_t::uptr get_send_buff(const int32_t timeout_ms)
+ {
+ return _send_link->get_send_buff(timeout_ms);
+ }
+
+ std::pair<void*, size_t> write_packet_header(
+ buff_t::uptr& buff, const packet_info_t& info)
+ {
+ uint8_t* data = static_cast<uint8_t*>(buff->data());
+ *(reinterpret_cast<packet_info_t*>(data)) = info;
+ return std::make_pair(data + sizeof(info), sizeof(info) + info.payload_bytes);
+ }
+
+ void release_send_buff(buff_t::uptr buff)
+ {
+ _send_link->release_send_buff(std::move(buff));
+ }
+
+ size_t get_max_payload_size() const
+ {
+ return _send_link->get_send_frame_size() - sizeof(packet_info_t);
+ ;
+ }
+
+private:
+ mock_send_link::sptr _send_link;
+};
+
+/*!
+ * Mock tx streamer for testing
+ */
+class mock_tx_streamer : public tx_streamer_impl<mock_tx_data_xport>
+{
+public:
+ mock_tx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args)
+ : tx_streamer_impl(num_chans, stream_args)
+ {
+ }
+
+ void set_tick_rate(double rate)
+ {
+ tx_streamer_impl::set_tick_rate(rate);
+ }
+
+ void set_samp_rate(double rate)
+ {
+ tx_streamer_impl::set_samp_rate(rate);
+ }
+
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ tx_streamer_impl::set_scale_factor(chan, scale_factor);
+ }
+};
+
+}} // namespace uhd::transport
+
+using namespace uhd::transport;
+
+using tx_streamer = tx_streamer_impl<mock_tx_data_xport>;
+
+static const double TICK_RATE = 100e6;
+static const double SAMP_RATE = 10e6;
+static const size_t FRAME_SIZE = 1000;
+static const double SCALE_FACTOR = 2;
+
+/*!
+ * Helper functions
+ */
+static std::vector<mock_send_link::sptr> make_links(const size_t num)
+{
+ const mock_send_link::link_params params = {FRAME_SIZE, 1};
+
+ std::vector<mock_send_link::sptr> links;
+
+ for (size_t i = 0; i < num; i++) {
+ links.push_back(std::make_shared<mock_send_link>(params));
+ }
+
+ return links;
+}
+
+static boost::shared_ptr<mock_tx_streamer> make_tx_streamer(
+ std::vector<mock_send_link::sptr> send_links, const std::string& format)
+{
+ const uhd::stream_args_t stream_args(format, "sc16");
+ auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args);
+ streamer->set_tick_rate(TICK_RATE);
+ streamer->set_samp_rate(SAMP_RATE);
+
+ for (size_t i = 0; i < send_links.size(); i++) {
+ mock_tx_data_xport::uptr xport(
+ std::make_unique<mock_tx_data_xport>(send_links[i]));
+
+ streamer->set_scale_factor(i, SCALE_FACTOR);
+ streamer->connect_channel(i, std::move(xport));
+ }
+
+ return streamer;
+}
+
+std::tuple<mock_tx_data_xport::packet_info_t, std::complex<uint16_t>*, size_t, boost::shared_array<uint8_t>>
+pop_send_packet(mock_send_link::sptr send_link)
+{
+ auto packet = send_link->pop_send_packet();
+
+ const size_t packet_samps =
+ (packet.second - sizeof(mock_tx_data_xport::packet_info_t))
+ / sizeof(std::complex<uint16_t>);
+
+ uint8_t* buff_ptr = packet.first.get();
+ auto info = *(reinterpret_cast<mock_tx_data_xport::packet_info_t*>(buff_ptr));
+
+ std::complex<uint16_t>* data = reinterpret_cast<std::complex<uint16_t>*>(
+ buff_ptr + sizeof(mock_tx_data_xport::packet_info_t));
+
+ return std::make_tuple(info, data, packet_samps, packet.first);
+}
+
+/*!
+ * Tests
+ */
+BOOST_AUTO_TEST_CASE(test_send_one_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 30;
+ const std::string format("fc32");
+
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer and write data
+ std::vector<std::complex<float>> buff(20);
+ for (size_t i = 0; i < buff.size(); i++) {
+ buff[i] = std::complex<float>(i * 2, i * 2 + 1);
+ }
+
+ // Send packets and check data
+ size_t num_accum_samps = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "sending packet " << i << std::endl;
+
+ // Vary num_samps for each packet
+ const size_t num_samps = 10 + i % 10;
+ metadata.end_of_burst = (i == NUM_PKTS_TO_TEST - 1);
+ const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]);
+ BOOST_CHECK_EQUAL(num_samps, packet_samps);
+
+ // Check data
+ for (size_t j = 0; j < num_samps; j++) {
+ const std::complex<uint16_t> value(
+ (j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, data[j]);
+ }
+
+ BOOST_CHECK_EQUAL(num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>));
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE);
+ BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1);
+ num_accum_samps += num_samps;
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_send_one_channel_multi_packet)
+{
+ const size_t NUM_BUFFS_TO_TEST = 5;
+ const std::string format("fc64");
+
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer and write data
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * 4;
+ std::vector<std::complex<double>> buff(num_samps);
+ for (size_t i = 0; i < buff.size(); i++) {
+ buff[i] = std::complex<double>(i * 2, i * 2 + 1);
+ }
+
+ // Send packets and check data
+ size_t num_accum_samps = 0;
+
+ for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) {
+ std::cout << "sending packet " << i << std::endl;
+
+ metadata.end_of_burst = true;
+ const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+
+ size_t samps_checked = 0;
+
+ while (samps_checked < num_samps) {
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]);
+
+ for (size_t j = 0; j < packet_samps; j++) {
+ const size_t n = j + samps_checked;
+ const std::complex<uint16_t> value(
+ (n * 2) * SCALE_FACTOR, (n * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, data[j]);
+ }
+
+ BOOST_CHECK_EQUAL(
+ packet_samps, info.payload_bytes / sizeof(std::complex<uint16_t>));
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK_EQUAL(
+ info.tsf, (num_accum_samps + samps_checked) * TICK_RATE / SAMP_RATE);
+ samps_checked += packet_samps;
+
+ BOOST_CHECK_EQUAL(info.eob, samps_checked == num_samps);
+ }
+
+ BOOST_CHECK_EQUAL(samps_checked, num_samps);
+ num_accum_samps += samps_checked;
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_send_two_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 30;
+ const std::string format("sc16");
+
+ auto send_links = make_links(2);
+ auto streamer = make_tx_streamer(send_links, format);
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer and write data
+ std::vector<std::complex<uint16_t>> buff(20);
+ for (size_t i = 0; i < buff.size(); i++) {
+ buff[i] = std::complex<uint16_t>(i * 2, i * 2 + 1);
+ }
+ std::vector<void*> buffs;
+ for (size_t ch = 0; ch < 2; ch++) {
+ buffs.push_back(buff.data()); // same buffer for each channel
+ }
+
+ // Send packets and check data
+ size_t num_accum_samps = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "sending packet " << i << std::endl;
+
+ // Vary num_samps for each packet
+ const size_t num_samps = 10 + i % 10;
+ metadata.end_of_burst = (i == NUM_PKTS_TO_TEST - 1);
+ const size_t num_sent = streamer->send(buffs, num_samps, metadata, 1.0);
+ BOOST_CHECK_EQUAL(num_sent, num_samps);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+
+ for (size_t ch = 0; ch < 2; ch++) {
+ mock_tx_data_xport::packet_info_t info;
+ std::complex<uint16_t>* data;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[ch]);
+ BOOST_CHECK_EQUAL(num_samps, packet_samps);
+
+ // Check data
+ for (size_t j = 0; j < num_samps; j++) {
+ const std::complex<uint16_t> value((j * 2), (j * 2 + 1));
+ BOOST_CHECK_EQUAL(value, data[j]);
+ }
+
+ BOOST_CHECK_EQUAL(
+ num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>));
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE);
+ BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1);
+ }
+ num_accum_samps += num_samps;
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_meta_data_cache)
+{
+ auto send_links = make_links(1);
+ auto streamer = make_tx_streamer(send_links, "fc32");
+
+ // Allocate metadata
+ uhd::tx_metadata_t metadata;
+ metadata.start_of_burst = true;
+ metadata.end_of_burst = true;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ // Allocate buffer and write data
+ std::vector<std::complex<float>> buff(20);
+
+ size_t num_sent = streamer->send(buff.data(), 0, metadata, 1.0);
+ BOOST_CHECK_EQUAL(send_links[0]->get_num_packets(), 0);
+ BOOST_CHECK_EQUAL(num_sent, 0);
+ uhd::tx_metadata_t metadata2;
+ num_sent = streamer->send(buff.data(), 10, metadata2, 1.0);
+
+ mock_tx_data_xport::packet_info_t info;
+ size_t packet_samps;
+ boost::shared_array<uint8_t> frame_buff;
+
+ std::tie(info, std::ignore, packet_samps, frame_buff) = pop_send_packet(send_links[0]);
+ BOOST_CHECK_EQUAL(packet_samps, num_sent);
+ BOOST_CHECK(info.has_tsf);
+ BOOST_CHECK(info.eob);
+}
+
+BOOST_AUTO_TEST_CASE(test_spp)
+{
+ // Test the spp calculation when it is limited by the stream args
+ {
+ auto send_links = make_links(1);
+ uhd::stream_args_t stream_args("fc64", "sc16");
+ stream_args.args["spp"] = std::to_string(10);
+ auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args);
+ mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0]));
+ streamer->connect_channel(0, std::move(xport));
+ BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), 10);
+ }
+
+ // Test the spp calculation when it is limited by the frame size
+ {
+ auto send_links = make_links(1);
+ uhd::stream_args_t stream_args("fc64", "sc16");
+ stream_args.args["spp"] = std::to_string(10000);
+ auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args);
+ mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0]));
+ const size_t max_pyld = xport->get_max_payload_size();
+ streamer->connect_channel(0, std::move(xport));
+ BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), max_pyld / sizeof(std::complex<uint16_t>));
+ }
+}