diff options
Diffstat (limited to 'host/lib/transport/uhd-dpdk/dpdk_io_service.cpp')
-rw-r--r-- | host/lib/transport/uhd-dpdk/dpdk_io_service.cpp | 950 |
1 files changed, 950 insertions, 0 deletions
diff --git a/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp new file mode 100644 index 000000000..1fcedca51 --- /dev/null +++ b/host/lib/transport/uhd-dpdk/dpdk_io_service.cpp @@ -0,0 +1,950 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/transport/dpdk/arp.hpp> +#include <uhdlib/transport/dpdk/udp.hpp> +#include <uhdlib/transport/dpdk_io_service_client.hpp> +#include <uhdlib/utils/narrow.hpp> +#include <cmath> + +/* + * Memory management + * + * Every object that allocates and frees DPDK memory has a reference to the + * dpdk_ctx. + * + * Ownership hierarchy: + * + * dpdk_io_service_mgr (1) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * xport (1) => + * dpdk_send_io::sptr + * dpdk_recv_io::sptr + * + * usrp link_mgr (1) => + * udp_dpdk_link::sptr + * + * dpdk_send_io (2) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * dpdk_recv_io (2) => + * dpdk_ctx::sptr + * dpdk_io_service::sptr + * + * dpdk_io_service (3) => + * dpdk_ctx::wptr (weak_ptr) + * udp_dpdk_link::sptr + * + * udp_dpdk_link (4) => + * dpdk_ctx::sptr + */ + +using namespace uhd::transport; + +dpdk_io_service::dpdk_io_service( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) + : _ctx(dpdk::dpdk_ctx::get()) + , _lcore_id(lcore_id) + , _ports(ports) + , _servq(servq_depth, lcore_id) +{ + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Launching I/O service for lcore " << lcore_id); + for (auto port : _ports) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "lcore_id " << lcore_id << ": Adding port index " << port->get_port_id()); + _tx_queues[port->get_port_id()] = std::list<dpdk_send_io*>(); + _recv_xport_map[port->get_port_id()] = std::list<dpdk_recv_io*>(); + } + int status = rte_eal_remote_launch(_io_worker, this, lcore_id); + if (status) { + throw uhd::runtime_error("DPDK: I/O service cannot launch on busy lcore"); + } +} + +dpdk_io_service::sptr dpdk_io_service::make( + unsigned int lcore_id, std::vector<dpdk::dpdk_port*> ports, size_t servq_depth) +{ + return dpdk_io_service::sptr(new dpdk_io_service(lcore_id, ports, servq_depth)); +} + +dpdk_io_service::~dpdk_io_service() +{ + UHD_LOG_TRACE( + "DPDK::IO_SERVICE", "Shutting down I/O service for lcore " << _lcore_id); + dpdk::wait_req* req = dpdk::wait_req_alloc(dpdk::wait_type::WAIT_LCORE_TERM, NULL); + if (!req) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "Could not allocate request for lcore termination for lcore " << _lcore_id); + return; + } + dpdk::wait_req_get(req); + _servq.submit(req, std::chrono::microseconds(-1)); + dpdk::wait_req_put(req); +} + +void dpdk_io_service::attach_recv_link(recv_link_if::sptr link) +{ + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link.get()); + data.is_recv = true; + assert(data.link); + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to attach recv_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach recv_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + dpdk::wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _recv_links.push_back(link); + } +} + +void dpdk_io_service::attach_send_link(send_link_if::sptr link) +{ + udp_dpdk_link* dpdk_link = dynamic_cast<udp_dpdk_link*>(link.get()); + assert(dpdk_link); + + // First, fill in destination MAC address + struct dpdk::arp_request arp_data; + arp_data.tpa = dpdk_link->get_remote_ipv4(); + arp_data.port = dpdk_link->get_port()->get_port_id(); + if (dpdk_link->get_port()->dst_is_broadcast(arp_data.tpa)) { + // If a broadcast IP, skip the ARP and fill with broadcast MAC addr + memset(arp_data.tha.addr_bytes, 0xFF, 6); + } else { + auto arp_req = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); + if (!arp_req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req for ARP request"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req for ARP request"); + } + if (_servq.submit(arp_req, std::chrono::microseconds(3000000))) { + // Try one more time... + auto arp_req2 = wait_req_alloc(dpdk::wait_type::WAIT_ARP, (void*)&arp_data); + if (_servq.submit(arp_req2, std::chrono::microseconds(30000000))) { + wait_req_put(arp_req); + wait_req_put(arp_req2); + throw uhd::io_error("DPDK: Could not reach host"); + } + wait_req_put(arp_req2); + } + wait_req_put(arp_req); + } + dpdk_link->set_remote_mac(arp_data.tha); + + // Then, submit the link to the I/O service thread + struct dpdk_flow_data data; + data.link = dpdk_link; + data.is_recv = false; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_OPEN, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to attach send_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to attach send_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _send_links.push_back(link); + } +} + +void dpdk_io_service::detach_recv_link(recv_link_if::sptr link) +{ + auto link_ptr = link.get(); + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link_ptr); + data.is_recv = true; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to detach recv_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach recv_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _recv_links.remove_if( + [link_ptr](recv_link_if::sptr& item) { return item.get() == link_ptr; }); + } +} + +void dpdk_io_service::detach_send_link(send_link_if::sptr link) +{ + auto link_ptr = link.get(); + struct dpdk_flow_data data; + data.link = dynamic_cast<udp_dpdk_link*>(link_ptr); + data.is_recv = false; + auto req = wait_req_alloc(dpdk::wait_type::WAIT_FLOW_CLOSE, (void*)&data); + if (!req) { + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Could not allocate wait_req to detach send_link"); + throw uhd::runtime_error("DPDK: Could not allocate wait_req to detach send_link"); + } + _servq.submit(req, std::chrono::microseconds(-1)); + wait_req_put(req); + { + std::lock_guard<std::mutex> lock(_mutex); + _send_links.remove_if( + [link_ptr](send_link_if::sptr& item) { return item.get() == link_ptr; }); + } +} + +recv_io_if::sptr dpdk_io_service::make_recv_client(recv_link_if::sptr data_link, + size_t num_recv_frames, + recv_callback_t cb, + send_link_if::sptr /*fc_link*/, + size_t num_send_frames, + recv_io_if::fc_callback_t fc_cb) +{ + auto link = dynamic_cast<udp_dpdk_link*>(data_link.get()); + auto recv_io = std::make_shared<dpdk_recv_io>( + shared_from_this(), link, num_recv_frames, cb, num_send_frames, fc_cb); + + // Register with I/O service + recv_io->_dpdk_io_if.io_client = static_cast<void*>(recv_io.get()); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&recv_io->_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + wait_req_put(xport_req); + return recv_io; +} + +send_io_if::sptr dpdk_io_service::make_send_client(send_link_if::sptr send_link, + size_t num_send_frames, + send_io_if::send_callback_t send_cb, + recv_link_if::sptr /*recv_link*/, + size_t num_recv_frames, + recv_callback_t recv_cb, + send_io_if::fc_callback_t fc_cb) +{ + auto link = dynamic_cast<udp_dpdk_link*>(send_link.get()); + auto send_io = std::make_shared<dpdk_send_io>(shared_from_this(), + link, + num_send_frames, + send_cb, + num_recv_frames, + recv_cb, + fc_cb); + + // Register with I/O service + send_io->_dpdk_io_if.io_client = static_cast<void*>(send_io.get()); + auto xport_req = dpdk::wait_req_alloc( + dpdk::wait_type::WAIT_XPORT_CONNECT, (void*)&send_io->_dpdk_io_if); + _servq.submit(xport_req, std::chrono::microseconds(-1)); + wait_req_put(xport_req); + return send_io; +} + + +int dpdk_io_service::_io_worker(void* arg) +{ + if (!arg) + return -EINVAL; + dpdk_io_service* srv = (dpdk_io_service*)arg; + + /* Check that this is a valid lcore */ + unsigned int lcore_id = rte_lcore_id(); + if (lcore_id == LCORE_ID_ANY) + return -ENODEV; + + /* Check that this lcore has ports */ + if (srv->_ports.size() == 0) + return -ENODEV; + + char name[16]; + snprintf(name, sizeof(name), "dpdk-io_%hu", (uint16_t)lcore_id); + rte_thread_setname(pthread_self(), name); + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "I/O service thread '" << name << "' started on lcore " << lcore_id); + + uhd::set_thread_priority_safe(); + + snprintf(name, sizeof(name), "rx-tbl_%hu", (uint16_t)lcore_id); + struct rte_hash_parameters hash_params = {.name = name, + .entries = MAX_FLOWS, + .reserved = 0, + .key_len = sizeof(struct dpdk::ipv4_5tuple), + .hash_func = NULL, + .hash_func_init_val = 0, + .socket_id = uhd::narrow_cast<int>(rte_socket_id()), + .extra_flag = 0}; + srv->_rx_table = rte_hash_create(&hash_params); + if (srv->_rx_table == NULL) { + return rte_errno; + } + + int status = 0; + while (!status) { + /* For each port, attempt to receive packets and process */ + for (auto port : srv->_ports) { + srv->_rx_burst(port, 0); + } + /* For each port's TX queues, do TX */ + for (auto port : srv->_ports) { + srv->_tx_burst(port); + } + /* For each port's RX release queues, release buffers */ + for (auto port : srv->_ports) { + srv->_rx_release(port); + } + /* Retry waking clients */ + if (srv->_retry_head) { + dpdk_io_if* node = srv->_retry_head; + dpdk_io_if* end = srv->_retry_head->prev; + while (true) { + dpdk_io_if* next = node->next; + srv->_wake_client(node); + if (node == end) { + break; + } else { + node = next; + next = node->next; + } + } + } + /* Check for open()/close()/term() requests and service 1 at a time + * Leave this last so we immediately terminate if requested + */ + status = srv->_service_requests(); + } + + return status; +} + +int dpdk_io_service::_service_requests() +{ + for (int i = 0; i < MAX_PENDING_SERVICE_REQS; i++) { + /* Dequeue */ + dpdk::wait_req* req = _servq.pop(); + if (!req) { + break; + } + switch (req->reason) { + case dpdk::wait_type::WAIT_SIMPLE: + while (_servq.complete(req) == -ENOBUFS) + ; + break; + case dpdk::wait_type::WAIT_RX: + case dpdk::wait_type::WAIT_TX_BUF: + throw uhd::not_implemented_error( + "DPDK: _service_requests(): DPDK is still a WIP"); + case dpdk::wait_type::WAIT_FLOW_OPEN: + _service_flow_open(req); + break; + case dpdk::wait_type::WAIT_FLOW_CLOSE: + _service_flow_close(req); + break; + case dpdk::wait_type::WAIT_XPORT_CONNECT: + _service_xport_connect(req); + break; + case dpdk::wait_type::WAIT_XPORT_DISCONNECT: + _service_xport_disconnect(req); + break; + case dpdk::wait_type::WAIT_ARP: { + assert(req->data != NULL); + int arp_status = _service_arp_request(req); + assert(arp_status != -ENOMEM); + if (arp_status == 0) { + while (_servq.complete(req) == -ENOBUFS) + ; + } + break; + } + case dpdk::wait_type::WAIT_LCORE_TERM: + rte_free(_rx_table); + while (_servq.complete(req) == -ENOBUFS) + ; + // Return a positive value to indicate we should terminate + return 1; + default: + UHD_LOG_ERROR( + "DPDK::IO_SERVICE", "Invalid reason associated with wait request"); + while (_servq.complete(req) == -ENOBUFS) + ; + break; + } + } + return 0; +} + +void dpdk_io_service::_service_flow_open(dpdk::wait_req* req) +{ + auto flow_req_data = (struct dpdk_flow_data*)req->data; + assert(flow_req_data); + if (flow_req_data->is_recv) { + // If RX, add to RX table. Currently, nothing to do for TX. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = flow_req_data->link->get_port()->get_ipv4(), + .src_port = 0, + .dst_port = flow_req_data->link->get_local_port()}; + // Check the UDP port isn't in use + if (rte_hash_lookup(_rx_table, &ht_key) > 0) { + req->retval = -EADDRINUSE; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add to RX table"); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + // Add xport list for this UDP port + auto rx_entry = new std::list<dpdk_io_if*>(); + if (rte_hash_add_key_data(_rx_table, &ht_key, rx_entry)) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Could not add new RX list to table"); + delete rx_entry; + req->retval = -ENOMEM; + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_flow_close(dpdk::wait_req* req) +{ + auto flow_req_data = (struct dpdk_flow_data*)req->data; + assert(flow_req_data); + if (flow_req_data->is_recv) { + // If RX, remove from RX table. Currently, nothing to do for TX. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = flow_req_data->link->get_port()->get_ipv4(), + .src_port = 0, + .dst_port = flow_req_data->link->get_local_port()}; + std::list<dpdk_io_if*>* xport_list; + + if (rte_hash_lookup_data(_rx_table, &ht_key, (void**)&xport_list) > 0) { + UHD_ASSERT_THROW(xport_list->empty()); + delete xport_list; + rte_hash_del_key(_rx_table, &ht_key); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_xport_connect(dpdk::wait_req* req) +{ + auto dpdk_io = static_cast<dpdk_io_if*>(req->data); + UHD_ASSERT_THROW(dpdk_io); + auto port = dpdk_io->link->get_port(); + if (dpdk_io->recv_cb) { + // Add to RX table only if have a callback. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = dpdk_io->link->get_local_port()}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { + req->retval = -ENOENT; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot add xport to RX table"); + while (_servq.complete(req) == -ENOBUFS) + ; + return; + } + // Add to xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + rx_entry->push_back(dpdk_io); + } + if (dpdk_io->is_recv) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX connect request..."); + // Add to xport list for this NIC port + auto& xport_list = _recv_xport_map.at(port->get_port_id()); + xport_list.push_back((dpdk_recv_io*)dpdk_io->io_client); + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX connect request..."); + dpdk_send_io* send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); + // Add to xport list for this NIC port + auto& xport_list = _tx_queues.at(port->get_port_id()); + xport_list.push_back(send_io); + for (size_t i = 0; i < send_io->_num_send_frames; i++) { + auto buff_ptr = + (dpdk::dpdk_frame_buff*)dpdk_io->link->get_send_buff(0).release(); + if (!buff_ptr) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "TX mempool out of memory. Please increase dpdk_num_mbufs."); + break; + } + if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + break; + } + send_io->_num_frames_in_use++; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +void dpdk_io_service::_service_xport_disconnect(dpdk::wait_req* req) +{ + auto dpdk_io = (struct dpdk_io_if*)req->data; + assert(dpdk_io); + auto port = dpdk_io->link->get_port(); + if (dpdk_io->recv_cb) { + // Remove from RX table only if have a callback. + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = dpdk_io->link->get_local_port()}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) >= 0) { + // Remove from xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + rx_entry->remove(dpdk_io); + } else { + req->retval = -EINVAL; + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Cannot remove xport from RX table"); + } + } + if (dpdk_io->is_recv) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing RX disconnect request..."); + dpdk_recv_io* recv_client = static_cast<dpdk_recv_io*>(dpdk_io->io_client); + // Remove from xport list for this NIC port + auto& xport_list = _recv_xport_map.at(port->get_port_id()); + xport_list.remove(recv_client); + while (!rte_ring_empty(recv_client->_recv_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(recv_client->_recv_queue, (void**)&buff_ptr); + dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); + } + while (!rte_ring_empty(recv_client->_release_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(recv_client->_release_queue, (void**)&buff_ptr); + dpdk_io->link->release_recv_buff(frame_buff::uptr(buff_ptr)); + } + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Servicing TX disconnect request..."); + dpdk_send_io* send_client = static_cast<dpdk_send_io*>(dpdk_io->io_client); + // Remove from xport list for this NIC port + auto& xport_list = _tx_queues.at(port->get_port_id()); + xport_list.remove(send_client); + while (!rte_ring_empty(send_client->_send_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(send_client->_send_queue, (void**)&buff_ptr); + dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); + } + while (!rte_ring_empty(send_client->_buffer_queue)) { + frame_buff* buff_ptr; + rte_ring_dequeue(send_client->_buffer_queue, (void**)&buff_ptr); + dpdk_io->link->release_send_buff(frame_buff::uptr(buff_ptr)); + } + } + // Now remove the node if it's on the retry list + if ((_retry_head == dpdk_io) && (dpdk_io->next == dpdk_io)) { + _retry_head = NULL; + } else if (_retry_head) { + dpdk_io_if* node = _retry_head->next; + while (node != _retry_head) { + if (node == dpdk_io) { + dpdk_io->prev->next = dpdk_io->next; + dpdk_io->next->prev = dpdk_io->prev; + break; + } + node = node->next; + } + } + while (_servq.complete(req) == -ENOBUFS) + ; +} + +int dpdk_io_service::_service_arp_request(dpdk::wait_req* req) +{ + int status = 0; + auto arp_req_data = (struct dpdk::arp_request*)req->data; + dpdk::ipv4_addr dst_addr = arp_req_data->tpa; + auto ctx_sptr = _ctx.lock(); + UHD_ASSERT_THROW(ctx_sptr); + dpdk::dpdk_port* port = ctx_sptr->get_port(arp_req_data->port); + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "ARP: Requesting address for " << dpdk::ipv4_num_to_str(dst_addr)); + + rte_spinlock_lock(&port->_spinlock); + struct dpdk::arp_entry* entry = NULL; + if (port->_arp_table.count(dst_addr) == 0) { + entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + status = -ENOMEM; + goto arp_end; + } + entry = new (entry) dpdk::arp_entry(); + entry->reqs.push_back(req); + port->_arp_table[dst_addr] = entry; + status = -EAGAIN; + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Address not in table. Sending ARP request."); + _send_arp_request(port, 0, arp_req_data->tpa); + } else { + entry = port->_arp_table.at(dst_addr); + if (is_zero_ether_addr(&entry->mac_addr)) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "ARP: Address in table, but not populated yet. Resending ARP request."); + port->_arp_table.at(dst_addr)->reqs.push_back(req); + status = -EAGAIN; + _send_arp_request(port, 0, arp_req_data->tpa); + } else { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "ARP: Address in table."); + ether_addr_copy(&entry->mac_addr, &arp_req_data->tha); + status = 0; + } + } +arp_end: + rte_spinlock_unlock(&port->_spinlock); + return status; +} + +int dpdk_io_service::_send_arp_request( + dpdk::dpdk_port* port, dpdk::queue_id_t queue, dpdk::ipv4_addr ip) +{ + struct rte_mbuf* mbuf; + struct ether_hdr* hdr; + struct arp_hdr* arp_frame; + + mbuf = rte_pktmbuf_alloc(port->get_tx_pktbuf_pool()); + if (unlikely(mbuf == NULL)) { + UHD_LOG_WARNING( + "DPDK::IO_SERVICE", "Could not allocate packet buffer for ARP request"); + return -ENOMEM; + } + + hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*); + arp_frame = (struct arp_hdr*)&hdr[1]; + + memset(hdr->d_addr.addr_bytes, 0xFF, ETHER_ADDR_LEN); + hdr->s_addr = port->get_mac_addr(); + hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_ARP); + + arp_frame->arp_hrd = rte_cpu_to_be_16(ARP_HRD_ETHER); + arp_frame->arp_pro = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + arp_frame->arp_hln = 6; + arp_frame->arp_pln = 4; + arp_frame->arp_op = rte_cpu_to_be_16(ARP_OP_REQUEST); + arp_frame->arp_data.arp_sha = port->get_mac_addr(); + arp_frame->arp_data.arp_sip = port->get_ipv4(); + memset(arp_frame->arp_data.arp_tha.addr_bytes, 0x00, ETHER_ADDR_LEN); + arp_frame->arp_data.arp_tip = ip; + + mbuf->pkt_len = 42; + mbuf->data_len = 42; + + if (rte_eth_tx_burst(port->get_port_id(), queue, &mbuf, 1) != 1) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "ARP request not sent: Descriptor ring full"); + rte_pktmbuf_free(mbuf); + return -EAGAIN; + } + return 0; +} + +/* Do a burst of RX on port */ +int dpdk_io_service::_rx_burst(dpdk::dpdk_port* port, dpdk::queue_id_t queue) +{ + struct ether_hdr* hdr; + char* l2_data; + struct rte_mbuf* bufs[RX_BURST_SIZE]; + const uint16_t num_rx = + rte_eth_rx_burst(port->get_port_id(), queue, bufs, RX_BURST_SIZE); + if (unlikely(num_rx == 0)) { + return 0; + } + + for (int buf = 0; buf < num_rx; buf++) { + uint64_t ol_flags = bufs[buf]->ol_flags; + hdr = rte_pktmbuf_mtod(bufs[buf], struct ether_hdr*); + l2_data = (char*)&hdr[1]; + switch (rte_be_to_cpu_16(hdr->ether_type)) { + case ETHER_TYPE_ARP: + _process_arp(port, queue, (struct arp_hdr*)l2_data); + rte_pktmbuf_free(bufs[buf]); + break; + case ETHER_TYPE_IPv4: + if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_BAD) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet has bad IP cksum"); + } else if ((ol_flags & PKT_RX_IP_CKSUM_MASK) == PKT_RX_IP_CKSUM_NONE) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "RX packet missing IP cksum"); + } else { + _process_ipv4(port, bufs[buf], (struct ipv4_hdr*)l2_data); + } + break; + default: + rte_pktmbuf_free(bufs[buf]); + break; + } + } + return num_rx; +} + +int dpdk_io_service::_process_arp( + dpdk::dpdk_port* port, dpdk::queue_id_t queue_id, struct arp_hdr* arp_frame) +{ + uint32_t dest_ip = arp_frame->arp_data.arp_sip; + struct ether_addr dest_addr = arp_frame->arp_data.arp_sha; + UHD_LOG_TRACE("DPDK::IO_SERVICE", + "Processing ARP packet: " << dpdk::ipv4_num_to_str(dest_ip) << " -> " + << dpdk::eth_addr_to_string(dest_addr)); + /* Add entry to ARP table */ + rte_spinlock_lock(&port->_spinlock); + struct dpdk::arp_entry* entry = NULL; + if (port->_arp_table.count(dest_ip) == 0) { + entry = (struct dpdk::arp_entry*)rte_zmalloc(NULL, sizeof(*entry), 0); + if (!entry) { + return -ENOMEM; + } + entry = new (entry) dpdk::arp_entry(); + ether_addr_copy(&dest_addr, &entry->mac_addr); + port->_arp_table[dest_ip] = entry; + } else { + entry = port->_arp_table.at(dest_ip); + ether_addr_copy(&dest_addr, &entry->mac_addr); + for (auto req : entry->reqs) { + auto arp_data = (struct dpdk::arp_request*)req->data; + ether_addr_copy(&dest_addr, &arp_data->tha); + while (_servq.complete(req) == -ENOBUFS) + ; + } + entry->reqs.clear(); + } + rte_spinlock_unlock(&port->_spinlock); + + /* Respond if this was an ARP request */ + if (arp_frame->arp_op == rte_cpu_to_be_16(ARP_OP_REQUEST) + && arp_frame->arp_data.arp_tip == port->get_ipv4()) { + UHD_LOG_TRACE("DPDK::IO_SERVICE", "Sending ARP reply."); + port->_arp_reply(queue_id, arp_frame); + } + + return 0; +} + +int dpdk_io_service::_process_ipv4( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct ipv4_hdr* pkt) +{ + bool bcast = port->dst_is_broadcast(pkt->dst_addr); + if (pkt->dst_addr != port->get_ipv4() && !bcast) { + rte_pktmbuf_free(mbuf); + return -ENODEV; + } + if (pkt->next_proto_id == IPPROTO_UDP) { + return _process_udp(port, mbuf, (struct udp_hdr*)&pkt[1], bcast); + } + rte_pktmbuf_free(mbuf); + return -EINVAL; +} + + +int dpdk_io_service::_process_udp( + dpdk::dpdk_port* port, struct rte_mbuf* mbuf, struct udp_hdr* pkt, bool /*bcast*/) +{ + // Get the link + struct dpdk::ipv4_5tuple ht_key = {.flow_type = dpdk::flow_type::FLOW_TYPE_UDP, + .src_ip = 0, + .dst_ip = port->get_ipv4(), + .src_port = 0, + .dst_port = pkt->dst_port}; + void* hash_data; + if (rte_hash_lookup_data(_rx_table, &ht_key, &hash_data) < 0) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No link entry in rx table"); + rte_pktmbuf_free(mbuf); + return -ENOENT; + } + // Get xport list for this UDP port + auto rx_entry = (std::list<dpdk_io_if*>*)(hash_data); + if (rx_entry->empty()) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No xports for link"); + rte_pktmbuf_free(mbuf); + return -ENOENT; + } + // Turn rte_mbuf -> dpdk_frame_buff + auto link = rx_entry->front()->link; + link->enqueue_recv_mbuf(mbuf); + auto buff = link->get_recv_buff(0); + bool rcvr_found = false; + for (auto client_if : *rx_entry) { + // Check all the muxed receivers... + if (client_if->recv_cb(buff, link, link)) { + rcvr_found = true; + if (buff) { + assert(client_if->is_recv); + auto recv_io = (dpdk_recv_io*)client_if->io_client; + auto buff_ptr = (dpdk::dpdk_frame_buff*)buff.release(); + if (rte_ring_enqueue(recv_io->_recv_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + UHD_LOG_WARNING( + "DPDK::IO_SERVICE", "Dropping packet: No space in recv queue"); + } else { + recv_io->_num_frames_in_use++; + assert(recv_io->_num_frames_in_use <= recv_io->_num_recv_frames); + _wake_client(client_if); + } + } + break; + } + } + if (!rcvr_found) { + UHD_LOG_WARNING("DPDK::IO_SERVICE", "Dropping packet: No receiver xport found"); + // Release the buffer if no receiver found + link->release_recv_buff(std::move(buff)); + return -ENOENT; + } + return 0; +} + +/* Do a burst of TX on port's tx queues */ +int dpdk_io_service::_tx_burst(dpdk::dpdk_port* port) +{ + unsigned int total_tx = 0; + auto& queues = _tx_queues.at(port->get_port_id()); + + for (auto& send_io : queues) { + unsigned int num_tx = rte_ring_count(send_io->_send_queue); + num_tx = (num_tx < TX_BURST_SIZE) ? num_tx : TX_BURST_SIZE; + bool replaced_buffers = false; + for (unsigned int i = 0; i < num_tx; i++) { + size_t frame_size = send_io->_dpdk_io_if.link->get_send_frame_size(); + if (send_io->_fc_cb && !send_io->_fc_cb(frame_size)) { + break; + } + dpdk::dpdk_frame_buff* buff_ptr; + int status = rte_ring_dequeue(send_io->_send_queue, (void**)&buff_ptr); + if (status) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "TX Q Count doesn't match actual"); + break; + } + send_io->_send_cb(frame_buff::uptr(buff_ptr), send_io->_dpdk_io_if.link); + // Attempt to replace buffer + buff_ptr = (dpdk::dpdk_frame_buff*)send_io->_dpdk_io_if.link->get_send_buff(0) + .release(); + if (!buff_ptr) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", + "TX mempool out of memory. Please increase dpdk_num_mbufs."); + send_io->_num_frames_in_use--; + } else if (rte_ring_enqueue(send_io->_buffer_queue, buff_ptr)) { + rte_pktmbuf_free(buff_ptr->get_pktmbuf()); + send_io->_num_frames_in_use--; + } else { + replaced_buffers = true; + } + } + if (replaced_buffers) { + _wake_client(&send_io->_dpdk_io_if); + } + total_tx += num_tx; + } + + return total_tx; +} + +int dpdk_io_service::_rx_release(dpdk::dpdk_port* port) +{ + unsigned int total_bufs = 0; + auto& queues = _recv_xport_map.at(port->get_port_id()); + + for (auto& recv_io : queues) { + unsigned int num_buf = rte_ring_count(recv_io->_release_queue); + num_buf = (num_buf < RX_BURST_SIZE) ? num_buf : RX_BURST_SIZE; + for (unsigned int i = 0; i < num_buf; i++) { + dpdk::dpdk_frame_buff* buff_ptr; + int status = rte_ring_dequeue(recv_io->_release_queue, (void**)&buff_ptr); + if (status) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "RX Q Count doesn't match actual"); + break; + } + recv_io->_fc_cb(frame_buff::uptr(buff_ptr), + recv_io->_dpdk_io_if.link, + recv_io->_dpdk_io_if.link); + recv_io->_num_frames_in_use--; + } + total_bufs += num_buf; + } + + return total_bufs; +} + +uint16_t dpdk_io_service::_get_unique_client_id() +{ + std::lock_guard<std::mutex> lock(_mutex); + if (_client_id_set.size() >= MAX_CLIENTS) { + UHD_LOG_ERROR("DPDK::IO_SERVICE", "Exceeded maximum number of clients"); + throw uhd::runtime_error("DPDK::IO_SERVICE: Exceeded maximum number of clients"); + } + + uint16_t id = _next_client_id++; + while (_client_id_set.count(id)) { + id = _next_client_id++; + } + _client_id_set.insert(id); + return id; +} + +void dpdk_io_service::_wake_client(dpdk_io_if* dpdk_io) +{ + dpdk::wait_req* req; + if (dpdk_io->is_recv) { + auto recv_io = static_cast<dpdk_recv_io*>(dpdk_io->io_client); + req = recv_io->_waiter; + } else { + auto send_io = static_cast<dpdk_send_io*>(dpdk_io->io_client); + req = send_io->_waiter; + } + bool stat = req->mutex.try_lock(); + if (stat) { + bool active_req = !req->complete; + if (dpdk_io->next) { + // On the list: Take it off + if (dpdk_io->next == dpdk_io) { + // Only node on the list + _retry_head = NULL; + } else { + // Other nodes are on the list + if (_retry_head == dpdk_io) { + // Move list head to next + _retry_head = dpdk_io->next; + } + dpdk_io->next->prev = dpdk_io->prev; + dpdk_io->prev->next = dpdk_io->next; + } + dpdk_io->next = NULL; + dpdk_io->prev = NULL; + } + if (active_req) { + req->complete = true; + req->cond.notify_one(); + } + req->mutex.unlock(); + if (active_req) { + wait_req_put(req); + } + } else { + // Put on the retry list, if it isn't already + if (!dpdk_io->next) { + if (_retry_head) { + dpdk_io->next = _retry_head; + dpdk_io->prev = _retry_head->prev; + _retry_head->prev->next = dpdk_io; + _retry_head->prev = dpdk_io; + } else { + _retry_head = dpdk_io; + dpdk_io->next = dpdk_io; + dpdk_io->prev = dpdk_io; + } + } + } +} |