diff options
-rw-r--r-- | host/tests/CMakeLists.txt | 16 | ||||
-rw-r--r-- | host/tests/common/mock_transport.hpp | 14 | ||||
-rw-r--r-- | host/tests/dpdk_test.cpp | 174 |
3 files changed, 121 insertions, 83 deletions
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 2c53e4905..d25c88400 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -118,6 +118,22 @@ endmacro(UHD_ADD_NONAPI_TEST) if(ENABLE_DPDK) find_package(DPDK) UHD_ADD_NONAPI_TEST( + TARGET "dpdk_test.cpp" + EXTRA_SOURCES + ${CMAKE_SOURCE_DIR}/lib/utils/config_parser.cpp + ${CMAKE_SOURCE_DIR}/lib/utils/paths.cpp + ${CMAKE_SOURCE_DIR}/lib/utils/pathslib.cpp + ${CMAKE_SOURCE_DIR}/lib/utils/prefs.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/adapter.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_common.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/uhd-dpdk/dpdk_io_service.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/udp_dpdk_link.cpp + INCLUDE_DIRS + ${DPDK_INCLUDE_DIR} + EXTRA_LIBS ${DPDK_LIBRARIES} + NOAUTORUN # Don't register for auto-run, it requires special config + ) + UHD_ADD_NONAPI_TEST( TARGET "dpdk_port_test.cpp" EXTRA_SOURCES ${CMAKE_SOURCE_DIR}/lib/utils/config_parser.cpp diff --git a/host/tests/common/mock_transport.hpp b/host/tests/common/mock_transport.hpp index a8dc761f1..6ca48ee5f 100644 --- a/host/tests/common/mock_transport.hpp +++ b/host/tests/common/mock_transport.hpp @@ -7,6 +7,7 @@ #ifndef INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP #define INCLUDED_UHDLIB_TRANSPORT_TRANSPORT_IF_HPP +#include <uhd/exception.hpp> #include <uhdlib/transport/io_service.hpp> #include <boost/lockfree/spsc_queue.hpp> #include <utility> @@ -39,8 +40,7 @@ public: uint16_t dst_addr, uint16_t src_addr, uint32_t credits) - : _credits(credits) - , _frame_size(send_link->get_send_frame_size()) + : _credits(credits), _frame_size(send_link->get_send_frame_size()) { _send_addr = (dst_addr << 16) | (src_addr << 0); _recv_addr = (src_addr << 16) | (dst_addr << 0); @@ -66,8 +66,8 @@ public: send_link_if* /*send_link*/) { return this->recv_buff(buff, link); }; - send_io_if::fc_callback_t fc_cb = [this](size_t) { - return this->_seqno < this->_ackno + this->_credits; + send_io_if::fc_callback_t fc_cb = [this](const size_t bytes) { + return this->can_send(bytes); }; /* Pretend get 1 flow control message per sent packet */ @@ -156,6 +156,11 @@ public: _seqno++; } + bool can_send(size_t bytes) + { + return _seqno < _ackno + _credits; + }; + /*! * Callback for when packets are received (for processing). * Function should make a determination of whether the packet belongs to it @@ -312,6 +317,7 @@ public: UHD_ASSERT_THROW(buff == nullptr); fc_data[TYPE_OFFSET] = 1; /* FC type */ fc_data[ADDR_OFFSET] = _send_addr; + fc_buff->set_packet_size(3 * sizeof(uint32_t)); send_link->release_send_buff(std::move(fc_buff)); } else { recv_link->release_recv_buff(std::move(buff)); diff --git a/host/tests/dpdk_test.cpp b/host/tests/dpdk_test.cpp index c32e47824..35348207e 100644 --- a/host/tests/dpdk_test.cpp +++ b/host/tests/dpdk_test.cpp @@ -8,8 +8,11 @@ */ -#include <uhdlib/transport/dpdk_zero_copy.hpp> -#include <uhdlib/transport/uhd-dpdk.h> +#include "common/mock_transport.hpp" +#include <uhdlib/transport/dpdk/common.hpp> +#include <uhdlib/transport/dpdk/service_queue.hpp> +#include <uhdlib/transport/dpdk_io_service.hpp> +#include <uhdlib/transport/udp_dpdk_link.hpp> #include <arpa/inet.h> #include <errno.h> #include <sched.h> @@ -39,7 +42,7 @@ struct dpdk_test_args { unsigned int portid; std::string dst_ip; - pthread_cond_t *cond; + pthread_cond_t* cond; pthread_mutex_t mutex; bool started; int cpu; @@ -54,10 +57,11 @@ struct dpdk_test_stats uint32_t last_ackno; uint32_t tx_seqno; uint64_t tx_xfer; + uint32_t tx_no_bufs; }; -static void process_udp(int id, uint32_t *udp_data, struct dpdk_test_stats *stats) +static void process_udp(int id, uint32_t* udp_data, struct dpdk_test_stats* stats) { if (udp_data[0] != stats[id].last_seqno + 1) { stats[id].lasts[stats[id].dropped_packets & 0xf] = stats[id].last_seqno; @@ -69,38 +73,43 @@ static void process_udp(int id, uint32_t *udp_data, struct dpdk_test_stats *stat stats[id].last_ackno = udp_data[1]; } -static void send_udp(uhd::transport::dpdk_zero_copy::sptr& stream, +static void send_udp(uhd::transport::mock_send_transport::sptr& stream, int id, bool fc_only, - struct dpdk_test_stats *stats) + struct dpdk_test_stats* stats) { - uhd::transport::managed_send_buffer::sptr mbuf = stream->get_send_buff(0); - if (mbuf.get() == nullptr) { + uhd::transport::frame_buff::uptr mbuf = stream->get_data_buff(0); + if (!mbuf) { printf("Could not get TX buffer!\n"); + stats[id].tx_no_bufs++; return; } - auto *tx_data = mbuf->cast<uint32_t *>(); - tx_data[0] = fc_only ? stats[id].tx_seqno - 1 : stats[id].tx_seqno; - tx_data[1] = stats[id].last_seqno; + uint32_t* tx_data; + size_t buff_size; + std::tie(tx_data, buff_size) = stream->buff_to_data(mbuf.get()); + tx_data[0] = fc_only ? stats[id].tx_seqno - 1 : stats[id].tx_seqno; + tx_data[1] = stats[id].last_seqno; if (!fc_only) { - memset(&tx_data[2], stats[id].last_seqno, 8*BENCH_SPP); - stats[id].tx_xfer += 8*BENCH_SPP; + memset(&tx_data[2], stats[id].last_seqno, 8 * BENCH_SPP); + stats[id].tx_xfer += 8 * BENCH_SPP; } - size_t num_bytes = 8 + (fc_only ? 0 : 8*BENCH_SPP); - mbuf->commit(num_bytes); - mbuf.reset(); + size_t num_bytes = 8 + (fc_only ? 0 : 8 * BENCH_SPP); + stream->release_data_buff(mbuf, num_bytes / 4); if (!fc_only) { stats[id].tx_seqno++; } } -static void bench( - uhd::transport::dpdk_zero_copy::sptr *stream, uint32_t nb_ports, double timeout) +static void bench(uhd::transport::mock_send_transport::sptr* tx_stream, + uhd::transport::mock_recv_transport::sptr* rx_stream, + uint32_t nb_ports, + double timeout) { uint64_t total_xfer[NUM_PORTS]; uint32_t id; - struct dpdk_test_stats *stats = (struct dpdk_test_stats *) malloc(sizeof(*stats)*nb_ports); + struct dpdk_test_stats* stats = + (struct dpdk_test_stats*)malloc(sizeof(*stats) * nb_ports); for (id = 0; id < nb_ports; id++) { stats[id].tx_seqno = 1; stats[id].tx_xfer = 0; @@ -117,21 +126,20 @@ static void bench( */ uint64_t total_received = 0; uint32_t consec_no_rx = 0; - while ((total_received / nb_ports) < 10000000) { //&& consec_no_rx < 10000) { + while ((total_received / nb_ports) < 1000000) { //&& consec_no_rx < 10000) { for (id = 0; id < nb_ports; id++) { unsigned int nb_rx = 0; - uhd::transport::managed_recv_buffer::sptr bufs[BURST_SIZE]; + uhd::transport::frame_buff::uptr bufs[BURST_SIZE]; for (; nb_rx < BURST_SIZE; nb_rx++) { - bufs[nb_rx] = stream[id]->get_recv_buff(timeout); - if (bufs[nb_rx].get() == nullptr) { - bufs[nb_rx].reset(); + bufs[nb_rx] = rx_stream[id]->get_data_buff(timeout); + if (bufs[nb_rx] == nullptr) { break; } } if (nb_rx <= 0) { if (timeout > 0.0) { - send_udp(stream[id], id, true, stats); + send_udp(tx_stream[id], id, true, stats); } consec_no_rx++; if (consec_no_rx >= 100000) { @@ -148,9 +156,12 @@ static void bench( } for (unsigned int buf = 0; buf < nb_rx; buf++) { - total_xfer[id] += bufs[buf]->size(); - auto data = bufs[buf]->cast<uint32_t *>(); + total_xfer[id] += bufs[buf]->packet_size(); + uint32_t* data; + size_t data_size; + std::tie(data, data_size) = rx_stream[id]->buff_to_data(bufs[buf].get()); process_udp(id, data, stats); + rx_stream[id]->release_data_buff(std::move(bufs[buf])); } total_received += nb_rx; @@ -162,7 +173,7 @@ static void bench( // uint32_t window_end = last_seqno[port] + TX_CREDITS; if (window_end <= stats[id].tx_seqno) { if (consec_no_rx == 9999) { - send_udp(stream[id], id, true, stats); + send_udp(tx_stream[id], id, true, stats); } // send_udp(tx[id], id, true); ; @@ -170,7 +181,7 @@ static void bench( for (unsigned int pktno = 0; (pktno < BURST_SIZE) && (stats[id].tx_seqno < window_end); pktno++) { - send_udp(stream[id], id, false, stats); + send_udp(tx_stream[id], id, false, stats); } } } @@ -183,18 +194,18 @@ static void bench( printf("Bytes received = %ld\n", total_xfer[id]); printf("Bytes sent = %ld\n", stats[id].tx_xfer); printf("Time taken = %ld us\n", - (bench_end.tv_sec - bench_start.tv_sec)*1000000 + (bench_end.tv_sec - bench_start.tv_sec) * 1000000 + (bench_end.tv_usec - bench_start.tv_usec)); - double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec)*1000000 - + (bench_end.tv_usec - bench_start.tv_usec); + double elapsed_time = (bench_end.tv_sec - bench_start.tv_sec) * 1000000 + + (bench_end.tv_usec - bench_start.tv_usec); elapsed_time *= 1.0e-6; double elapsed_bytes = total_xfer[id]; - printf("RX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time); + printf("RX Performance = %e Gbps\n", elapsed_bytes * 8.0 / 1.0e9 / elapsed_time); elapsed_bytes = stats[id].tx_xfer; - printf("TX Performance = %e Gbps\n", elapsed_bytes*8.0/1.0e9/elapsed_time); - uint32_t skt_drops = stream[id]->get_drop_count(); + printf("TX Performance = %e Gbps\n", elapsed_bytes * 8.0 / 1.0e9 / elapsed_time); + // uint32_t skt_drops = stream[id]->get_drop_count(); printf("Dropped %d packets\n", stats[id].dropped_packets); - printf("Socket reports dropped %d packets\n", skt_drops); + // printf("Socket reports dropped %d packets\n", skt_drops); for (unsigned int i = 0; i < 16; i++) { if (i >= stats[id].dropped_packets) break; @@ -221,14 +232,15 @@ static inline void set_cpu(pthread_t t, int cpu) std::string get_ipv4_addr(unsigned int port_id) { - struct in_addr ipv4_addr; - int status = uhd_dpdk_get_ipv4_addr(port_id, &ipv4_addr.s_addr, NULL); - UHD_ASSERT_THROW(status == 0); + auto ctx = uhd::transport::dpdk::dpdk_ctx::get(); + auto port = ctx->get_port(port_id); + auto ip = port->get_ipv4(); char addr_str[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &ipv4_addr, addr_str, sizeof(addr_str)); + inet_ntop(AF_INET, &ip, addr_str, sizeof(addr_str)); return std::string(addr_str); } +/* void *prepare_and_bench_blocking(void *arg) { struct dpdk_test_args *args = (struct dpdk_test_args *) arg; @@ -237,7 +249,7 @@ void *prepare_and_bench_blocking(void *arg) set_cpu(t, args->cpu); args->started = true; pthread_cond_wait(args->cond, &args->mutex); - auto& ctx = uhd::transport::uhd_dpdk_ctx::get(); + auto ctx = uhd::transport::dpdk_ctx::get(); uhd::transport::dpdk_zero_copy::sptr eth_data[1]; uhd::transport::zero_copy_xport_params buff_args; buff_args.recv_frame_size = 8000; @@ -258,53 +270,57 @@ void *prepare_and_bench_blocking(void *arg) bench(eth_data, 1, 0.1); return 0; } +*/ void prepare_and_bench_polling(void) { - auto& ctx = uhd::transport::uhd_dpdk_ctx::get(); + auto ctx = uhd::transport::dpdk::dpdk_ctx::get(); - uhd::transport::dpdk_zero_copy::sptr eth_data[NUM_PORTS]; - uhd::transport::zero_copy_xport_params buff_args; + uhd::transport::udp_dpdk_link::sptr eth_data[NUM_PORTS]; + uhd::transport::mock_send_transport::sptr tx_strm[NUM_PORTS]; + uhd::transport::mock_recv_transport::sptr rx_strm[NUM_PORTS]; + uhd::transport::link_params_t buff_args; buff_args.recv_frame_size = 8000; buff_args.send_frame_size = 8000; - buff_args.num_send_frames = 8; - buff_args.num_recv_frames = 8; + buff_args.num_send_frames = 32; + buff_args.num_recv_frames = 32; auto dev_addr = uhd::device_addr_t(); - eth_data[0] = uhd::transport::dpdk_zero_copy::make( - ctx, - 0, - get_ipv4_addr(1), - "48888", - "48888", - buff_args, - dev_addr - ); - eth_data[1] = uhd::transport::dpdk_zero_copy::make( - ctx, - 1, - get_ipv4_addr(0), - "48888", - "48888", - buff_args, - dev_addr - ); - - bench(eth_data, NUM_PORTS, 0.0); + eth_data[0] = uhd::transport::udp_dpdk_link::make( + 0, get_ipv4_addr(1), "48888", "48888", buff_args); + eth_data[1] = uhd::transport::udp_dpdk_link::make( + 1, get_ipv4_addr(0), "48888", "48888", buff_args); + auto io_srv0 = ctx->get_io_service(0); + io_srv0->attach_send_link(eth_data[0]); + io_srv0->attach_recv_link(eth_data[0]); + auto io_srv1 = ctx->get_io_service(1); + io_srv1->attach_send_link(eth_data[1]); + io_srv1->attach_recv_link(eth_data[1]); + tx_strm[0] = std::make_shared<uhd::transport::mock_send_transport>( + io_srv0, eth_data[0], eth_data[0], 0, 0, 32); + rx_strm[0] = std::make_shared<uhd::transport::mock_recv_transport>( + io_srv0, eth_data[0], eth_data[0], 1, 1, 32); + tx_strm[1] = std::make_shared<uhd::transport::mock_send_transport>( + io_srv1, eth_data[1], eth_data[1], 1, 1, 32); + rx_strm[1] = std::make_shared<uhd::transport::mock_recv_transport>( + io_srv1, eth_data[1], eth_data[1], 0, 0, 32); + + + bench(tx_strm, rx_strm, NUM_PORTS, 0.0); } -int main(int argc, char **argv) +int main(int argc, char** argv) { int retval, user0_cpu = 0, user1_cpu = 2; int status = 0; std::string args; std::string cpusets; po::options_description desc("Allowed options"); - desc.add_options() - ("help", "help message") - ("args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args") - ("polling-mode", "Use polling mode (single thread on own core)") - ("cpusets", po::value<std::string>(&cpusets)->default_value(""), "which core(s) to use for a given thread in blocking mode (specify something like \"user0=0,user1=2\")") - ; + desc.add_options()("help", "help message")( + "args", po::value<std::string>(&args)->default_value(""), "UHD-DPDK args")( + "polling-mode", "Use polling mode (single thread on own core)")("cpusets", + po::value<std::string>(&cpusets)->default_value(""), + "which core(s) to use for a given thread in blocking mode (specify something " + "like \"user0=0,user1=2\")"); po::variables_map vm; po::store(po::parse_command_line(argc, argv, desc), vm); po::notify(vm); @@ -314,9 +330,9 @@ int main(int argc, char **argv) return 0; } - auto dpdk_args = uhd::device_addr_t(args); + auto dpdk_args = uhd::device_addr_t(args); - auto cpuset_map = uhd::device_addr_t(cpusets); + auto cpuset_map = uhd::device_addr_t(cpusets); for (std::string& key : cpuset_map.keys()) { if (key == "user0") { user0_cpu = std::stoi(cpuset_map[key], NULL, 0); @@ -325,12 +341,12 @@ int main(int argc, char **argv) } } - auto& ctx = uhd::transport::uhd_dpdk_ctx::get(); - ctx.init(args); + auto ctx = uhd::transport::dpdk::dpdk_ctx::get(); + ctx->init(args); if (vm.count("polling-mode")) { prepare_and_bench_polling(); - } else { + } /*else { pthread_cond_t cond; pthread_cond_init(&cond, NULL); struct dpdk_test_args bench_args[2]; @@ -379,7 +395,7 @@ int main(int argc, char **argv) perror("Error while joining thread"); return status; } - } + }*/ return status; } |