aboutsummaryrefslogtreecommitdiffstats
path: root/host/tests/rx_streamer_test.cpp
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-05-23 20:38:07 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:29 -0800
commit75a090543b8fb8e7c875387eee6d3fe7227e4450 (patch)
tree2904b48607cc07158aa6b068ada35ab56c4da516 /host/tests/rx_streamer_test.cpp
parentd8e9705bc6c34b8d015b56a76955ee2f15426bd8 (diff)
downloaduhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.gz
uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.tar.bz2
uhd-75a090543b8fb8e7c875387eee6d3fe7227e4450.zip
rfnoc: add rx and tx transports, and amend rfnoc_graph
transports: Transports build on I/O service and implements flow control and sequence number checking. The rx streamer subclass extends the streamer implementation to connect it to the rfnoc graph. It receives configuration values from property propagation and configures the streamer accordingly. It also implements the issue_stream_cmd rx_streamer API method. Add implementation of rx streamer creation and method to connect it to an rfnoc block. rfnoc_graph: Cache more connection info, clarify contract Summary of changes: - rfnoc_graph stores more information about static connections at the beginning. Some search algorithms are replaced by simpler lookups. - The contract for connect() was clarified. It is required to call connect, even for static connections.
Diffstat (limited to 'host/tests/rx_streamer_test.cpp')
-rw-r--r--host/tests/rx_streamer_test.cpp744
1 files changed, 744 insertions, 0 deletions
diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp
new file mode 100644
index 000000000..cd4daf569
--- /dev/null
+++ b/host/tests/rx_streamer_test.cpp
@@ -0,0 +1,744 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include "../common/mock_link.hpp"
+#include <uhdlib/transport/rx_streamer_impl.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/test/unit_test.hpp>
+#include <iostream>
+
+namespace uhd { namespace transport {
+
+/*!
+ * Contents of mock packet header
+ */
+struct mock_header_t
+{
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ bool ignore_seq = true;
+ size_t seq_num = 0;
+};
+
+/*!
+ * Mock rx data xport which doesn't use I/O service, and just interacts with
+ * the link directly.
+ */
+class mock_rx_data_xport
+{
+public:
+ using uptr = std::unique_ptr<mock_rx_data_xport>;
+ using buff_t = uhd::transport::frame_buff;
+
+ //! Values extracted from received RX data packets
+ struct packet_info_t
+ {
+ bool eob = false;
+ bool has_tsf = false;
+ uint64_t tsf = 0;
+ size_t payload_bytes = 0;
+ const void* payload = nullptr;
+ };
+
+ mock_rx_data_xport(mock_recv_link::sptr recv_link) : _recv_link(recv_link) {}
+
+ std::tuple<frame_buff::uptr, packet_info_t, bool> get_recv_buff(
+ const int32_t timeout_ms)
+ {
+ frame_buff::uptr buff = _recv_link->get_recv_buff(timeout_ms);
+ mock_header_t header = *(reinterpret_cast<mock_header_t*>(buff->data()));
+
+ packet_info_t info;
+ info.eob = header.eob;
+ info.has_tsf = header.has_tsf;
+ info.tsf = header.tsf;
+ info.payload_bytes = header.payload_bytes;
+ info.payload = reinterpret_cast<uint8_t*>(buff->data()) + sizeof(mock_header_t);
+
+ const uint8_t* pkt_end =
+ reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size();
+ const size_t pyld_pkt_len =
+ pkt_end - reinterpret_cast<const uint8_t*>(info.payload);
+
+ if (pyld_pkt_len < info.payload_bytes) {
+ _recv_link->release_recv_buff(std::move(buff));
+ throw uhd::value_error("Bad header or invalid packet length.");
+ }
+
+ const bool seq_match = header.seq_num == _seq_num;
+ const bool seq_error = !header.ignore_seq && !seq_match;
+ _seq_num = header.seq_num + 1;
+
+ return std::make_tuple(std::move(buff), info, seq_error);
+ }
+
+ void release_recv_buff(frame_buff::uptr buff)
+ {
+ _recv_link->release_recv_buff(std::move(buff));
+ }
+
+ size_t get_max_payload_size() const
+ {
+ return _recv_link->get_recv_frame_size() - sizeof(packet_info_t);
+ }
+
+private:
+ mock_recv_link::sptr _recv_link;
+ size_t _seq_num = 0;
+};
+
+/*!
+ * Mock rx streamer for testing
+ */
+class mock_rx_streamer : public rx_streamer_impl<mock_rx_data_xport>
+{
+public:
+ mock_rx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args)
+ : rx_streamer_impl(num_chans, stream_args)
+ {
+ }
+
+ void issue_stream_cmd(const stream_cmd_t&) {}
+
+ void set_tick_rate(double rate)
+ {
+ rx_streamer_impl::set_tick_rate(rate);
+ }
+
+ void set_samp_rate(double rate)
+ {
+ rx_streamer_impl::set_samp_rate(rate);
+ }
+
+ void set_scale_factor(const size_t chan, const double scale_factor)
+ {
+ rx_streamer_impl::set_scale_factor(chan, scale_factor);
+ }
+};
+
+}} // namespace uhd::transport
+
+using namespace uhd::transport;
+
+using rx_streamer = rx_streamer_impl<mock_rx_data_xport>;
+
+static const double TICK_RATE = 100e6;
+static const double SAMP_RATE = 10e6;
+static const size_t FRAME_SIZE = 1000;
+static const double SCALE_FACTOR = 2;
+
+/*!
+ * Helper functions
+ */
+static std::vector<mock_recv_link::sptr> make_links(const size_t num)
+{
+ const mock_recv_link::link_params params = {FRAME_SIZE, 1};
+
+ std::vector<mock_recv_link::sptr> links;
+
+ for (size_t i = 0; i < num; i++) {
+ links.push_back(std::make_shared<mock_recv_link>(params));
+ }
+
+ return links;
+}
+
+static boost::shared_ptr<mock_rx_streamer> make_rx_streamer(
+ std::vector<mock_recv_link::sptr> recv_links,
+ const std::string& host_format,
+ const std::string& otw_format = "sc16")
+{
+ const uhd::stream_args_t stream_args(host_format, otw_format);
+ auto streamer = boost::make_shared<mock_rx_streamer>(recv_links.size(), stream_args);
+ streamer->set_tick_rate(TICK_RATE);
+ streamer->set_samp_rate(SAMP_RATE);
+
+ for (size_t i = 0; i < recv_links.size(); i++) {
+ mock_rx_data_xport::uptr xport(
+ std::make_unique<mock_rx_data_xport>(recv_links[i]));
+
+ streamer->set_scale_factor(i, SCALE_FACTOR);
+ streamer->connect_channel(i, std::move(xport));
+ }
+
+ return streamer;
+}
+
+static void push_back_recv_packet(mock_recv_link::sptr recv_link,
+ mock_header_t header,
+ size_t num_samps,
+ uint16_t start_data = 0)
+{
+ // Allocate buffer
+ const size_t pyld_bytes = num_samps * sizeof(std::complex<uint16_t>);
+ const size_t buff_len = sizeof(header) + pyld_bytes;
+ boost::shared_array<uint8_t> data(new uint8_t[buff_len]);
+
+ // Write header to buffer
+ header.payload_bytes = pyld_bytes;
+ *(reinterpret_cast<mock_header_t*>(data.get())) = header;
+
+ // Write data to buffer
+ auto data_ptr =
+ reinterpret_cast<std::complex<uint16_t>*>(data.get() + sizeof(header));
+
+ for (size_t i = 0; i < num_samps; i++) {
+ uint16_t val = (start_data + i) * 2;
+ data_ptr[i] = std::complex<uint16_t>(val, val + 1);
+ }
+
+ // Push back buffer for link to recv
+ recv_link->push_back_recv_packet(data, buff_len);
+}
+
+/*!
+ * Tests
+ */
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ const bool even_iteration = (i % 2 == 0);
+ const bool odd_iteration = (i % 2 != 0);
+ mock_header_t header;
+ header.eob = even_iteration;
+ header.has_tsf = odd_iteration;
+ header.tsf = i;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ for (size_t j = 0; j < num_samps; j++) {
+ const auto value =
+ std::complex<float>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[j]);
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet)
+{
+ const size_t NUM_BUFFS_TO_TEST = 5;
+ const std::string format("fc64");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * 4;
+ std::vector<std::complex<double>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.tsf = i;
+
+ size_t samps_written = 0;
+ while (samps_written < num_samps) {
+ size_t samps_to_write = std::min(num_samps - samps_written, spp);
+ push_back_recv_packet(recv_links[0], header, samps_to_write, samps_written);
+ samps_written += samps_to_write;
+ }
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, false);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ for (size_t j = 0; j < num_samps; j++) {
+ const auto value =
+ std::complex<double>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[j]);
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet_with_eob)
+{
+ // EOB should terminate a multi-packet recv, test that it does
+ const std::string format("sc16");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_packets = 4;
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t num_samps = spp * num_packets;
+ std::vector<std::complex<double>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ // Queue 4 packets, with eob set in every other packet
+ for (size_t i = 0; i < num_packets; i++) {
+ mock_header_t header;
+ header.has_tsf = false;
+ header.eob = (i % 2) != 0;
+ push_back_recv_packet(recv_links[0], header, spp);
+ }
+
+ // Now call recv and check that eob terminates a recv call
+ for (size_t i = 0; i < num_packets / 2; i++) {
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, spp * 2);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, false);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_two_channel_one_packet)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("sc16");
+
+ const size_t num_chans = 2;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+
+ std::vector<std::vector<std::complex<uint16_t>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ const bool even_iteration = (i % 2 == 0);
+ const bool odd_iteration = (i % 2 != 0);
+ mock_header_t header;
+ header.eob = even_iteration;
+ header.has_tsf = odd_iteration;
+ header.tsf = i;
+
+ size_t samps_pushed = 0;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps, samps_pushed);
+ samps_pushed += num_samps;
+ }
+
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i);
+
+ size_t samps_checked = 0;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ for (size_t samp = 0; samp < num_samps; samp++) {
+ const size_t n = samps_checked + samp;
+ const auto value = std::complex<uint16_t>((n * 2), (n * 2 + 1));
+ BOOST_CHECK_EQUAL(value, buffer[ch][samp]);
+ }
+ samps_checked += num_samps;
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_one_channel_packet_fragment)
+{
+ const size_t NUM_PKTS_TO_TEST = 5;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ // Push back five packets, then read them 1/4 of a packet at a time
+ const size_t spp = streamer->get_max_num_samps();
+ const size_t reads_per_packet = 4;
+ const size_t num_samps = spp / reads_per_packet;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = true;
+ header.has_tsf = true;
+ header.tsf = 0;
+ push_back_recv_packet(recv_links[0], header, num_samps * reads_per_packet);
+ }
+
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ std::cout << "receiving packet " << i << std::endl;
+
+ size_t total_samps_read = 0;
+ for (size_t j = 0; j < reads_per_packet; j++) {
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.more_fragments, j != reads_per_packet - 1);
+ BOOST_CHECK_EQUAL(metadata.fragment_offset, total_samps_read);
+
+ const size_t ticks_per_sample = static_cast<size_t>(TICK_RATE / SAMP_RATE);
+ const size_t expected_ticks = ticks_per_sample * total_samps_read;
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), expected_ticks);
+
+ for (size_t samp = 0; samp < num_samps; samp++) {
+ const size_t pkt_idx = samp + total_samps_read;
+ const auto value = std::complex<float>(
+ (pkt_idx * 2) * SCALE_FACTOR, (pkt_idx * 2 + 1) * SCALE_FACTOR);
+ BOOST_CHECK_EQUAL(value, buff[samp]);
+ }
+
+ total_samps_read += num_samps_ret;
+ }
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_seq_error)
+{
+ // Test that when we get a sequence error the error is returned in the
+ // metadata with a time spec that corresponds to the time spec of the
+ // last sample in the previous packet plus one sample clock. Test that
+ // the packet that causes the sequence error is not discarded.
+ const size_t NUM_PKTS_TO_TEST = 2;
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+ size_t seq_num = 0;
+ size_t tsf = 0;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.ignore_seq = false;
+
+ // Push back three packets but skip a seq_num after the second
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ tsf += num_samps;
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ seq_num++; // dropped packet
+ tsf += num_samps;
+
+ header.seq_num = seq_num++;
+ header.tsf = tsf;
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // First two reads should succeed
+ size_t num_samps_ret =
+ streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ size_t prev_tsf = metadata.time_spec.to_ticks(TICK_RATE);
+ size_t expected_tsf = prev_tsf + num_samps * (TICK_RATE / SAMP_RATE);
+
+ // Third read should be a sequence error
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+ size_t metadata_tsf = metadata.time_spec.to_ticks(TICK_RATE);
+ BOOST_CHECK_EQUAL(metadata_tsf, expected_tsf);
+
+ // Next read should succeed
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, false);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_bad_packet)
+{
+ // Test that when we receive a packet with invalid chdr header or length
+ // the streamer returns the correct error in meatadata.
+ auto push_back_bad_packet = [](mock_recv_link::sptr recv_link) {
+ mock_header_t header;
+ header.payload_bytes = 1000;
+
+ // Allocate a buffer that is too small for the payload
+ const size_t buff_len = 100;
+ boost::shared_array<uint8_t> data(new uint8_t[buff_len]);
+
+ // Write header to buffer
+ *(reinterpret_cast<mock_header_t*>(data.get())) = header;
+
+ // Push back buffer for link to recv
+ recv_link->push_back_recv_packet(data, buff_len);
+ };
+
+ const std::string format("fc32");
+
+ auto recv_links = make_links(1);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 20;
+ std::vector<std::complex<float>> buff(num_samps);
+ uhd::rx_metadata_t metadata;
+
+ mock_header_t header;
+
+ // Push back a regular packet
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // Push back a bad packet
+ push_back_bad_packet(recv_links[0]);
+
+ // Push back another regular packet
+ push_back_recv_packet(recv_links[0], header, num_samps);
+
+ // First read should succeed
+ size_t num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+
+ // Second read should be an error
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET);
+
+ // Third read should succeed
+ num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_multi_channel_no_tsf)
+{
+ // Test that we can receive packets without tsf. Start by pushing
+ // a packet with a tsf followed by a few packets without.
+ const size_t NUM_PKTS_TO_TEST = 6;
+ const std::string format("fc64");
+
+ const size_t num_chans = 10;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 21;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) {
+ mock_header_t header;
+ header.eob = (i == NUM_PKTS_TO_TEST - 1);
+ header.has_tsf = (i == 0);
+ header.tsf = 500;
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, i == NUM_PKTS_TO_TEST - 1);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, i == 0);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_multi_channel_seq_error)
+{
+ // Test that the streamer handles dropped packets correctly by injecting
+ // a sequence error in one channel. The streamer should discard
+ // corresponding packets from all other channels.
+ const std::string format("fc64");
+
+ const size_t num_chans = 100;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 99;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ mock_header_t header;
+ header.eob = false;
+ header.has_tsf = true;
+ header.tsf = 0;
+ header.ignore_seq = false;
+ header.seq_num = 0;
+
+ // Drop a packet from an arbitrary channel right at the start
+ if (ch != num_chans / 2) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ // Add a regular packet to check the streamer drops the first
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+
+ // Drop a packet from the first channel
+ header.seq_num++;
+ header.tsf++;
+ if (ch != 0) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ // Add a regular packet
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+
+ // Drop a few packets from the last channel
+ for (size_t j = 0; j < 10; j++) {
+ header.seq_num++;
+ header.tsf++;
+ if (ch != num_chans - 1) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+ }
+
+ // Add a regular packet
+ header.seq_num++;
+ header.tsf++;
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ // First recv should result in error
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 1 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 1);
+
+ // Next recv should result in error
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 3 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 3);
+
+ // Next recv should result in error
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_EQUAL(metadata.out_of_sequence, true);
+
+ // Packet with tsf == 14 should be returned next
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 14);
+}
+
+BOOST_AUTO_TEST_CASE(test_recv_alignment_error)
+{
+ // Test that the alignment procedure returns an alignment error if it can't
+ // time align packets.
+ const std::string format("fc64");
+
+ const size_t num_chans = 4;
+
+ auto recv_links = make_links(num_chans);
+ auto streamer = make_rx_streamer(recv_links, format);
+
+ const size_t num_samps = 2;
+
+ std::vector<std::vector<std::complex<double>>> buffer(num_chans);
+ std::vector<void*> buffers;
+ for (size_t i = 0; i < num_chans; i++) {
+ buffer[i].resize(num_samps);
+ buffers.push_back(&buffer[i].front());
+ }
+
+ uhd::rx_metadata_t metadata;
+
+ mock_header_t header;
+ header.eob = true;
+ header.has_tsf = true;
+ header.tsf = 500;
+
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+
+ size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+
+ BOOST_CHECK_EQUAL(num_samps_ret, num_samps);
+ BOOST_CHECK_EQUAL(metadata.end_of_burst, true);
+ BOOST_CHECK_EQUAL(metadata.has_time_spec, true);
+
+ for (size_t pkt = 0; pkt < uhd::transport::ALIGNMENT_FAILURE_THRESHOLD; pkt++) {
+ header.tsf = header.tsf + num_samps;
+ for (size_t ch = 0; ch < num_chans; ch++) {
+ if (ch == num_chans - 1) {
+ // Misalign this time stamp
+ header.tsf += 1;
+ }
+ push_back_recv_packet(recv_links[ch], header, num_samps);
+ }
+ }
+
+ num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false);
+ BOOST_CHECK_EQUAL(num_samps_ret, 0);
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT);
+}