aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/tests/CMakeLists.txt16
-rw-r--r--host/tests/common/mock_transport.hpp14
-rw-r--r--host/tests/dpdk_test.cpp174
3 files changed, 121 insertions, 83 deletions
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index 2c53e4905..d25c88400 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -118,6 +118,22 @@ endmacro(UHD_ADD_NONAPI_TEST)
if(ENABLE_DPDK)
find_package(DPDK)
UHD_ADD_NONAPI_TEST(
+ TARGET "dpdk_test.cpp"
+ EXTRA_SOURCES
+ ${CMAKE_SOURCE_DIR}/lib/utils/config_parser.cpp
+ ${CMAKE_SOURCE_DIR}/lib/utils/paths.cpp
+ ${CMAKE_SOURCE_DIR}/lib/utils/pathslib.cpp
+ ${CMAKE_SOURCE_DIR}/lib/utils/prefs.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/adapter.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_common.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_io_service.cpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/udp_dpdk_link.cpp
+ INCLUDE_DIRS
+ ${DPDK_INCLUDE_DIR}
+ EXTRA_LIBS ${DPDK_LIBRARIES}
+ NOAUTORUN # Don't register for auto-run, it requires special config
+ )
+ UHD_ADD_NONAPI_TEST(
TARGET "dpdk_port_test.cpp"
EXTRA_SOURCES
${CMAKE_SOURCE_DIR}/lib/utils/config_parser.cpp
diff --git a/host/tests/common/mock_transport.hpp b/host/tests/common/mock_transport.hpp
index a8dc761f1..6ca48ee5f 100644
--- a/host/tests/common/mock_transport.hpp
+++ b/host/tests/common/mock_transport.hpp
@@ -7,6 +7,7 @@
#ifndef INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP
#define INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP
+#include <uhd/exception.hpp>
#include <uhdlib/transport/io_service.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <utility>
@@ -39,8 +40,7 @@ public:
uint16_t dst_addr,
uint16_t src_addr,
uint32_t credits)
- : _credits(credits)
- , _frame_size(send_link->get_send_frame_size())
+ : _credits(credits), _frame_size(send_link->get_send_frame_size())
{
_send_addr = (dst_addr << 16) | (src_addr << 0);
_recv_addr = (src_addr << 16) | (dst_addr << 0);
@@ -66,8 +66,8 @@ public:
send_link_if* /*send_link*/) {
return this->recv_buff(buff, link);
};
- send_io_if::fc_callback_t fc_cb = [this](size_t) {
- return this->_seqno < this->_ackno + this->_credits;
+ send_io_if::fc_callback_t fc_cb = [this](const size_t bytes) {
+ return this->can_send(bytes);
};
/* Pretend get 1 flow control message per sent packet */
@@ -156,6 +156,11 @@ public:
_seqno++;
}
+ bool can_send(size_t bytes)
+ {
+ return _seqno < _ackno + _credits;
+ };
+
/*!
* Callback for when packets are received (for processing).
* Function should make a determination of whether the packet belongs to it
@@ -312,6 +317,7 @@ public:
UHD_ASSERT_THROW(buff == nullptr);
fc_data[TYPE_OFFSET] = 1; /* FC type */
fc_data[ADDR_OFFSET] = _send_addr;
+ fc_buff->set_packet_size(3 * sizeof(uint32_t));
send_link->release_send_buff(std::move(fc_buff));
} else {
recv_link->release_recv_buff(std::move(buff));
diff --git a/host/tests/dpdk_test.cpp b/host/tests/dpdk_test.cpp
index c32e47824..35348207e 100644
--- a/host/tests/dpdk_test.cpp
+++ b/host/tests/dpdk_test.cpp
@@ -8,8 +8,11 @@
*/
-#include <uhdlib/transport/dpdk_zero_copy.hpp>
-#include <uhdlib/transport/uhd-dpdk.h>
+#include "common/mock_transport.hpp"
+#include <uhdlib/transport/dpdk/common.hpp>
+#include <uhdlib/transport/dpdk/service_queue.hpp>
+#include <uhdlib/transport/dpdk_io_service.hpp>
+#include <uhdlib/transport/udp_dpdk_link.hpp>
#include <arpa/inet.h>
#include <errno.h>
#include <sched.h>
@@ -39,7 +42,7 @@ struct dpdk_test_args
{
unsigned int portid;
std::string dst_ip;
- pthread_cond_t *cond;
+ pthread_cond_t* cond;
pthread_mutex_t mutex;
bool started;
int cpu;
@@ -54,10 +57,11 @@ struct dpdk_test_stats
uint32_t last_ackno;
uint32_t tx_seqno;
uint64_t tx_xfer;
+ uint32_t tx_no_bufs;
};
-static void process_udp(int id, uint32_t *udp_data, struct dpdk_test_stats *stats)
+static void process_udp(int id, uint32_t* udp_data, struct dpdk_test_stats* stats)
{
if (udp_data[0] != stats[id].last_seqno + 1) {
stats[id].lasts[stats[id].dropped_packets & 0xf] = stats[id].last_seqno;
@@ -69,38 +73,43 @@ static void process_udp(int id, uint32_t *udp_data, struct dpdk_test_stats *stat
stats[id].last_ackno = udp_data[1];
}
-static void send_udp(uhd::transport::dpdk_zero_copy::sptr& stream,
+static void send_udp(uhd::transport::mock_send_transport::sptr& stream,
int id,
bool fc_only,
- struct dpdk_test_stats *stats)
+ struct dpdk_test_stats* stats)
{
- uhd::transport::managed_send_buffer::sptr mbuf = stream->get_send_buff(0);
- if (mbuf.get() == nullptr) {
+ uhd::transport::frame_buff::uptr mbuf = stream->get_data_buff(0);
+ if (!mbuf) {
printf("Could not get TX buffer!\n");
+ stats[id].tx_no_bufs++;
return;
}
- auto *tx_data = mbuf->cast<uint32_t *>();
- tx_data[0] = fc_only ? stats[id].tx_seqno - 1 : stats[id].tx_seqno;
- tx_data[1] = stats[id].last_seqno;
+ uint32_t* tx_data;
+ size_t buff_size;
+ std::tie(tx_data, buff_size) = stream->buff_to_data(mbuf.get());
+ tx_data[0] = fc_only ? stats[id].tx_seqno - 1 : stats[id].tx_seqno;
+ tx_data[1] = stats[id].last_seqno;
if (!fc_only) {
- memset(&tx_data[2], stats[id].last_seqno, 8*BENCH_SPP);
- stats[id].tx_xfer += 8*BENCH_SPP;
+ memset(&tx_data[2], stats[id].last_seqno, 8 * BENCH_SPP);
+ stats[id].tx_xfer += 8 * BENCH_SPP;
}
- size_t num_bytes = 8 + (fc_only ? 0 : 8*BENCH_SPP);
- mbuf->commit(num_bytes);
- mbuf.reset();
+ size_t num_bytes = 8 + (fc_only ? 0 : 8 * BENCH_SPP);
+ stream->release_data_buff(mbuf, num_bytes / 4);
if (!fc_only) {
stats[id].tx_seqno++;
}
}
-static void bench(
- uhd::transport::dpdk_zero_copy::sptr *stream, uint32_t nb_ports, double timeout)
+static void bench(uhd::transport::mock_send_transport::sptr* tx_stream,
+ uhd::transport::mock_recv_transport::sptr* rx_stream,
+ uint32_t nb_ports,
+ double timeout)
{
uint64_t total_xfer[NUM_PORTS];
uint32_t id;
- struct dpdk_test_stats *stats = (struct dpdk_test_stats *) malloc(sizeof(*stats)*nb_ports);
+ struct dpdk_test_stats* stats =
+ (struct dpdk_test_stats*)malloc(sizeof(*stats) * nb_ports);
for (id = 0; id < nb_ports; id++) {
stats[id].tx_seqno = 1;
stats[id].tx_xfer = 0;
@@ -117,21 +126,20 @@ static void bench(
*/
uint64_t total_received = 0;
uint32_t consec_no_rx = 0;
- while ((total_received / nb_ports) < 10000000) { //&& consec_no_rx < 10000) {
+ while ((total_received / nb_ports) < 1000000) { //&& consec_no_rx < 10000) {
for (id = 0; id < nb_ports; id++) {
unsigned int nb_rx = 0;
- uhd::transport::managed_recv_buffer::sptr bufs[BURST_SIZE];
+ uhd::transport::frame_buff::uptr bufs[BURST_SIZE];
for (; nb_rx < BURST_SIZE; nb_rx++) {
- bufs[nb_rx] = stream[id]->get_recv_buff(timeout);
- if (bufs[nb_rx].get() == nullptr) {
- bufs[nb_rx].reset();
+ bufs[nb_rx] = rx_stream[id]->get_data_buff(timeout);
+ if (bufs[nb_rx] == nullptr) {
break;
}
}
if (nb_rx <= 0) {
if (timeout > 0.0) {
- send_udp(stream[id], id, true, stats);
+ send_udp(tx_stream[id], id, true, stats);
}
consec_no_rx++;
if (consec_no_rx >= 100000) {
@@ -148,9 +156,12 @@ static void bench(
}
for (unsigned int buf = 0; buf < nb_rx; buf++) {
- total_xfer[id] += bufs[buf]->size();
- auto data = bufs[buf]->cast<uint32_t *>();
+ total_xfer[id] += bufs[buf]->packet_size();
+ uint32_t* data;
+ size_t data_size;
+ std::tie(data, data_size) = rx_stream[id]->buff_to_data(bufs[buf].get());
process_udp(id, data, stats);
+ rx_stream[id]->release_data_buff(std::move(bufs[buf]));
}
total_received += nb_rx;
@@ -162,7 +173,7 @@ static void bench(
// uint32_t window_end = last_seqno[port] + TX_CREDITS;
if (window_end <= stats[id].tx_seqno) {
if (consec_no_rx == 9999) {
- send_udp(stream[id], id, true, stats);
+ send_udp(tx_stream[id], id, true, stats);
}
// send_udp(tx[id], id, true);
;
@@ -170,7 +181,7 @@ static void bench(
for (unsigned int pktno = 0;
(pktno < BURST_SIZE) && (stats[id].tx_seqno < window_end);
pktno++) {
- send_udp(stream[id], id, false, stats);
+ send_udp(tx_stream[id], id, false, stats);
}
}
}
@@ -183,18 +194,18 @@ static void bench(
printf("Bytes received = %ld\n", total_xfer[id]);
printf("Bytes sent = %ld\n", stats[id].tx_xfer);
printf("Time taken = %ld us\n",
- (bench_end.tv_sec - bench_start.tv_sec)*1000000
+ (bench_end.tv_sec - bench_start.tv_sec) * 1000000
+ (bench_end.tv_usec - bench_start.tv_usec));
- double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec)*1000000
- + (bench_end.tv_usec - bench_start.tv_usec);
+ double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec) * 1000000
+ + (bench_end.tv_usec - bench_start.tv_usec);
elapsed_time *= 1.0e-6;
double elapsed_bytes = total_xfer[id];
- printf("RX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time);
+ printf("RX Performance = %e Gbps\n", elapsed_bytes * 8.0 / 1.0e9 / elapsed_time);
elapsed_bytes = stats[id].tx_xfer;
- printf("TX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time);
- uint32_t skt_drops = stream[id]->get_drop_count();
+ printf("TX Performance = %e Gbps\n", elapsed_bytes * 8.0 / 1.0e9 / elapsed_time);
+ // uint32_t skt_drops = stream[id]->get_drop_count();
printf("Dropped %d packets\n", stats[id].dropped_packets);
- printf("Socket reports dropped %d packets\n", skt_drops);
+ // printf("Socket reports dropped %d packets\n", skt_drops);
for (unsigned int i = 0; i < 16; i++) {
if (i >= stats[id].dropped_packets)
break;
@@ -221,14 +232,15 @@ static inline void set_cpu(pthread_t t, int cpu)
std::string get_ipv4_addr(unsigned int port_id)
{
- struct in_addr ipv4_addr;
- int status = uhd_dpdk_get_ipv4_addr(port_id, &ipv4_addr.s_addr, NULL);
- UHD_ASSERT_THROW(status == 0);
+ auto ctx = uhd::transport::dpdk::dpdk_ctx::get();
+ auto port = ctx->get_port(port_id);
+ auto ip = port->get_ipv4();
char addr_str[INET_ADDRSTRLEN];
- inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str));
+ inet_ntop(AF_INET, &ip, addr_str, sizeof(addr_str));
return std::string(addr_str);
}
+/*
void *prepare_and_bench_blocking(void *arg)
{
struct dpdk_test_args *args = (struct dpdk_test_args *) arg;
@@ -237,7 +249,7 @@ void *prepare_and_bench_blocking(void *arg)
set_cpu(t, args->cpu);
args->started = true;
pthread_cond_wait(args->cond, &args->mutex);
- auto& ctx = uhd::transport::uhd_dpdk_ctx::get();
+ auto ctx = uhd::transport::dpdk_ctx::get();
uhd::transport::dpdk_zero_copy::sptr eth_data[1];
uhd::transport::zero_copy_xport_params buff_args;
buff_args.recv_frame_size = 8000;
@@ -258,53 +270,57 @@ void *prepare_and_bench_blocking(void *arg)
bench(eth_data, 1, 0.1);
return 0;
}
+*/
void prepare_and_bench_polling(void)
{
- auto& ctx = uhd::transport::uhd_dpdk_ctx::get();
+ auto ctx = uhd::transport::dpdk::dpdk_ctx::get();
- uhd::transport::dpdk_zero_copy::sptr eth_data[NUM_PORTS];
- uhd::transport::zero_copy_xport_params buff_args;
+ uhd::transport::udp_dpdk_link::sptr eth_data[NUM_PORTS];
+ uhd::transport::mock_send_transport::sptr tx_strm[NUM_PORTS];
+ uhd::transport::mock_recv_transport::sptr rx_strm[NUM_PORTS];
+ uhd::transport::link_params_t buff_args;
buff_args.recv_frame_size = 8000;
buff_args.send_frame_size = 8000;
- buff_args.num_send_frames = 8;
- buff_args.num_recv_frames = 8;
+ buff_args.num_send_frames = 32;
+ buff_args.num_recv_frames = 32;
auto dev_addr = uhd::device_addr_t();
- eth_data[0] = uhd::transport::dpdk_zero_copy::make(
- ctx,
- 0,
- get_ipv4_addr(1),
- "48888",
- "48888",
- buff_args,
- dev_addr
- );
- eth_data[1] = uhd::transport::dpdk_zero_copy::make(
- ctx,
- 1,
- get_ipv4_addr(0),
- "48888",
- "48888",
- buff_args,
- dev_addr
- );
-
- bench(eth_data, NUM_PORTS, 0.0);
+ eth_data[0] = uhd::transport::udp_dpdk_link::make(
+ 0, get_ipv4_addr(1), "48888", "48888", buff_args);
+ eth_data[1] = uhd::transport::udp_dpdk_link::make(
+ 1, get_ipv4_addr(0), "48888", "48888", buff_args);
+ auto io_srv0 = ctx->get_io_service(0);
+ io_srv0->attach_send_link(eth_data[0]);
+ io_srv0->attach_recv_link(eth_data[0]);
+ auto io_srv1 = ctx->get_io_service(1);
+ io_srv1->attach_send_link(eth_data[1]);
+ io_srv1->attach_recv_link(eth_data[1]);
+ tx_strm[0] = std::make_shared<uhd::transport::mock_send_transport>(
+ io_srv0, eth_data[0], eth_data[0], 0, 0, 32);
+ rx_strm[0] = std::make_shared<uhd::transport::mock_recv_transport>(
+ io_srv0, eth_data[0], eth_data[0], 1, 1, 32);
+ tx_strm[1] = std::make_shared<uhd::transport::mock_send_transport>(
+ io_srv1, eth_data[1], eth_data[1], 1, 1, 32);
+ rx_strm[1] = std::make_shared<uhd::transport::mock_recv_transport>(
+ io_srv1, eth_data[1], eth_data[1], 0, 0, 32);
+
+
+ bench(tx_strm, rx_strm, NUM_PORTS, 0.0);
}
-int main(int argc, char **argv)
+int main(int argc, char** argv)
{
int retval, user0_cpu = 0, user1_cpu = 2;
int status = 0;
std::string args;
std::string cpusets;
po::options_description desc("Allowed options");
- desc.add_options()
- ("help", "help message")
- ("args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args")
- ("polling-mode", "Use polling mode (single thread on own core)")
- ("cpusets", po::value<std::string>(&cpusets)->default_value(""), "which core(s) to use for a given thread in blocking mode (specify something like \"user0=0,user1=2\")")
- ;
+ desc.add_options()("help", "help message")(
+ "args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args")(
+ "polling-mode", "Use polling mode (single thread on own core)")("cpusets",
+ po::value<std::string>(&cpusets)->default_value(""),
+ "which core(s) to use for a given thread in blocking mode (specify something "
+ "like \"user0=0,user1=2\")");
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
@@ -314,9 +330,9 @@ int main(int argc, char **argv)
return 0;
}
- auto dpdk_args = uhd::device_addr_t(args);
+ auto dpdk_args = uhd::device_addr_t(args);
- auto cpuset_map = uhd::device_addr_t(cpusets);
+ auto cpuset_map = uhd::device_addr_t(cpusets);
for (std::string& key : cpuset_map.keys()) {
if (key == "user0") {
user0_cpu = std::stoi(cpuset_map[key], NULL, 0);
@@ -325,12 +341,12 @@ int main(int argc, char **argv)
}
}
- auto& ctx = uhd::transport::uhd_dpdk_ctx::get();
- ctx.init(args);
+ auto ctx = uhd::transport::dpdk::dpdk_ctx::get();
+ ctx->init(args);
if (vm.count("polling-mode")) {
prepare_and_bench_polling();
- } else {
+ } /*else {
pthread_cond_t cond;
pthread_cond_init(&cond, NULL);
struct dpdk_test_args bench_args[2];
@@ -379,7 +395,7 @@ int main(int argc, char **argv)
perror("Error while joining thread");
return status;
}
- }
+ }*/
return status;
}