// // Copyright 2019 Ettus Research, a National Instruments Brand // // SPDX-License-Identifier: GPL-3.0-or-later // #include #include #include #include #include #include #include #include #include namespace uhd { namespace transport { namespace { constexpr double SEND_TIMEOUT_MS = 500; // seconds } class dpdk_simple_impl : public dpdk_simple { public: dpdk_simple_impl(const std::string& addr, const std::string& port) { link_params_t link_params = _get_default_link_params(); _link = uhd::transport::udp_dpdk_link::make(addr, port, link_params); UHD_LOG_TRACE("DPDK::SIMPLE", "Creating simple UDP object for " << addr << ":" << port << ", DPDK port index " << _link->get_port()->get_port_id()); // The context must be initialized, or we'd never get here auto ctx = uhd::transport::dpdk::dpdk_ctx::get(); UHD_ASSERT_THROW(ctx->is_init_done()); // Init I/O service _port_id = _link->get_port()->get_port_id(); _io_service = ctx->get_io_service(_port_id); // This is normally done by the I/O service manager, but with DPDK, this // is all it does so we skip that step UHD_LOG_TRACE("DPDK::SIMPLE", "Attaching link to I/O service..."); _io_service->attach_recv_link(_link); _io_service->attach_send_link(_link); auto recv_cb = [this](buff_t::uptr& buff, recv_link_if*, send_link_if*) { return this->_recv_callback(buff); }; auto fc_cb = [this](buff_t::uptr buff, recv_link_if*, send_link_if*) { this->_recv_fc_callback(std::move(buff)); }; _recv_io = _io_service->make_recv_client(_link, link_params.num_recv_frames, recv_cb, nullptr, // No send/fc link 0, // No send frames fc_cb); auto send_cb = [this](buff_t::uptr buff, transport::send_link_if*) { this->_send_callback(std::move(buff)); }; _send_io = _io_service->make_send_client(_link, link_params.num_send_frames, send_cb, nullptr, // no FC link 0, nullptr, // No receive callback necessary [](const size_t) { return true; } // We can always send ); UHD_LOG_TRACE("DPDK::SIMPLE", "Constructor complete"); } ~dpdk_simple_impl(void) { UHD_LOG_TRACE("DPDK::SIMPLE", "~dpdk_simple_impl(), DPDK port index " << _link->get_port()->get_port_id()); // Disconnect the clients from the I/O service _send_io.reset(); _recv_io.reset(); // Disconnect the link from the I/O service _io_service->detach_recv_link(_link); _io_service->detach_send_link(_link); } /*! Send and release outstanding buffer * * \param length bytes of data to send * \return number of bytes sent (releases buffer if sent) */ size_t send(const boost::asio::const_buffer& user_buff) { // Extract buff and sanity check const size_t nbytes = boost::asio::buffer_size(user_buff); UHD_ASSERT_THROW(nbytes <= _link->get_send_frame_size()) const uint8_t* user_data = boost::asio::buffer_cast(user_buff); // Get send buff auto buff = _send_io->get_send_buff(SEND_TIMEOUT_MS); UHD_ASSERT_THROW(buff); buff->set_packet_size(nbytes); std::memcpy(buff->data(), user_data, nbytes); // Release send buff (send the packet) _send_io->release_send_buff(std::move(buff)); return nbytes; } /*! Receive a single packet. * * Buffer provided by transport (must be freed before next operation). * * \param buf a pointer to place to write buffer location * \param timeout the timeout in seconds * \return the number of bytes received or zero on timeout */ size_t recv(const boost::asio::mutable_buffer& user_buff, double timeout) { size_t user_buff_size = boost::asio::buffer_size(user_buff); uint8_t* user_data = boost::asio::buffer_cast(user_buff); auto buff = _recv_io->get_recv_buff(static_cast(timeout * 1e3)); if (!buff) { return 0; } // Extract the sender's address. This is only possible because we know // the memory layout of the buff struct udp_hdr* udp_hdr_end = (struct udp_hdr*)buff->data(); struct ipv4_hdr* ip_hdr_end = (struct ipv4_hdr*)(&udp_hdr_end[-1]); struct ipv4_hdr* ip_hdr = (struct ipv4_hdr*)(&ip_hdr_end[-1]); _last_recv_addr = ip_hdr->src_addr; // Extract the buffer data const size_t copy_len = std::min(user_buff_size, buff->packet_size()); if (copy_len < buff->packet_size()) { UHD_LOG_WARNING("DPDK", "Truncating recv packet"); } std::memcpy(user_data, buff->data(), copy_len); // Housekeeping _recv_io->release_recv_buff(std::move(buff)); return copy_len; } std::string get_recv_addr(void) { return dpdk::ipv4_num_to_str(_last_recv_addr); } std::string get_send_addr(void) { return dpdk::ipv4_num_to_str(_link->get_remote_ipv4()); } private: using buff_t = frame_buff; link_params_t _get_default_link_params() { link_params_t link_params; link_params.recv_frame_size = 8000; link_params.send_frame_size = 8000; link_params.num_recv_frames = 1; link_params.num_send_frames = 1; link_params.recv_buff_size = 8000; link_params.send_buff_size = 8000; return link_params; } void _send_callback(buff_t::uptr buff) { _link->release_send_buff(std::move(buff)); } bool _recv_callback(buff_t::uptr&) { // Queue it up return true; } void _recv_fc_callback(buff_t::uptr buff) { _link->release_recv_buff(std::move(buff)); } /*** Attributes **********************************************************/ unsigned int _port_id; uint32_t _last_recv_addr; udp_dpdk_link::sptr _link; dpdk_io_service::sptr _io_service; send_io_if::sptr _send_io; recv_io_if::sptr _recv_io; }; dpdk_simple::~dpdk_simple(void) {} /*********************************************************************** * DPDK simple transport public make functions **********************************************************************/ udp_simple::sptr dpdk_simple::make_connected( const std::string& addr, const std::string& port) { return udp_simple::sptr(new dpdk_simple_impl(addr, port)); } // For DPDK, this is not special and the same as make_connected udp_simple::sptr dpdk_simple::make_broadcast( const std::string& addr, const std::string& port) { return udp_simple::sptr(new dpdk_simple_impl(addr, port)); } }} // namespace uhd::transport