// // Copyright 2019 Ettus Research, a National Instruments Brand // // SPDX-License-Identifier: GPL-3.0-or-later // #include #include #include #include #include #include using namespace uhd; using namespace uhd::rfnoc; using namespace uhd::rfnoc::detail; using namespace uhd::transport; rx_flow_ctrl_sender::rx_flow_ctrl_sender( const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) : _dst_epid(sep_ids.first) { _fc_packet = pkt_factory.make_strs(); _fc_strs_pyld.src_epid = sep_ids.second; } void rx_flow_ctrl_sender::set_capacity(const stream_buff_params_t& recv_capacity) { _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; _fc_strs_pyld.capacity_pkts = recv_capacity.packets; } chdr_rx_data_xport::chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, uhd::transport::recv_link_if::sptr recv_link, uhd::transport::send_link_if::sptr send_link, const chdr::chdr_packet_factory& pkt_factory, const uhd::rfnoc::sep_id_pair_t& epids, const size_t num_recv_frames, const fc_params_t& fc_params, disconnect_callback_t disconnect) : _fc_state(epids, fc_params.freq) , _fc_sender(pkt_factory, epids) , _epid(epids.second) , _chdr_w_bytes(chdr_w_to_bits(pkt_factory.get_chdr_w()) / 8) , _disconnect(disconnect) { UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", "Creating rx xport with local epid=" << epids.second << ", remote epid=" << epids.first); _recv_packet = pkt_factory.make_generic(); _recv_packet_cb = pkt_factory.make_generic(); _fc_sender.set_capacity(fc_params.buff_capacity); // Calculate max payload size const size_t pyld_offset = _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; // Make data transport auto recv_cb = [this](buff_t::uptr& buff, recv_link_if* recv_link, send_link_if* send_link) { return this->_recv_callback(buff, recv_link, send_link); }; auto fc_cb = [this](buff_t::uptr buff, recv_link_if* recv_link, send_link_if* send_link) { this->_fc_callback(std::move(buff), recv_link, send_link); }; // Needs just a single send frame for responses _recv_io = io_srv->make_recv_client(recv_link, num_recv_frames, recv_cb, send_link, /* num_send_frames*/ 1, fc_cb); UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", "Stream endpoint was configured with:" << std::endl << "capacity bytes=" << fc_params.buff_capacity.bytes << ", packets=" << fc_params.buff_capacity.packets << std::endl << "fc frequency bytes=" << fc_params.freq.bytes << ", packets=" << fc_params.freq.packets); } chdr_rx_data_xport::~chdr_rx_data_xport() { // Release recv_io before allowing members needed by callbacks be destroyed _recv_io.reset(); // Disconnect the links _disconnect(); } chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sptr io_srv, recv_link_if::sptr recv_link, send_link_if::sptr send_link, const chdr::chdr_packet_factory& pkt_factory, mgmt::mgmt_portal& mgmt_portal, const sep_id_pair_t& epids, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, const stream_buff_params_t& recv_capacity, const stream_buff_params_t& fc_freq, const stream_buff_params_t& fc_headroom, const bool lossy_xport, disconnect_callback_t disconnect) { const sep_id_t remote_epid = epids.first; const sep_id_t local_epid = epids.second; rx_flow_ctrl_sender fc_sender(pkt_factory, epids); chdr::chdr_packet_writer::uptr pkt = pkt_factory.make_generic(); fc_sender.set_capacity(recv_capacity); chdr::strc_payload strc; auto recv_cb = [&pkt, local_epid, &strc](buff_t::uptr& buff, recv_link_if* /*recv_link*/, send_link_if* /*send_link*/) { pkt->refresh(buff->data()); const auto header = pkt->get_chdr_header(); const auto dst_epid = header.get_dst_epid(); if (dst_epid != local_epid) { return false; } const auto type = header.get_pkt_type(); if (type != chdr::PKT_TYPE_STRC) { return false; } strc.deserialize(pkt->get_payload_const_ptr_as(), pkt->get_payload_size() / sizeof(uint64_t), pkt->conv_to_host()); if (strc.op_code != chdr::STRC_INIT) { throw uhd::value_error("Unexpected opcode value in STRC packet."); } return true; }; auto fc_cb = [&fc_sender](buff_t::uptr buff, recv_link_if* recv_link, send_link_if* send_link) { recv_link->release_recv_buff(std::move(buff)); // Send a strs response to configure flow control on the sender. The // byte and packet counts are not important since they are reset by // the stream endpoint on receipt of this packet. fc_sender.send_strs(send_link, {0, 0}); }; // Create a temporary recv_io to receive the strc init auto recv_io = io_srv->make_recv_client(recv_link, 1, // num_recv_frames recv_cb, send_link, 1, // num_send_frames fc_cb); // Create a control transport with the rx data links to send mgmt packets // needed to setup the stream // Piggyback on frames from the recv_io_if auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, send_link, recv_link, pkt_factory, local_epid, 1, // num_send_frames 1, // num_recv_frames disconnect); // Setup a route to the EPID // Note that this may be gratuitous--The endpoint may already have been set up mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); // Initialize flow control - first, the management portal sends a stream // command containing its requested flow control frequency mgmt_portal.config_local_rx_stream_start(*ctrl_xport, remote_epid, lossy_xport, pyld_buff_fmt, mdata_buff_fmt, fc_freq, fc_headroom); // Now, release the buffer. In the flow control callback for the recv_io // (fc_cb above), we send a stream status containing the xport buffer // capacity. auto buff = recv_io->get_recv_buff(100); if (!buff) { throw uhd::runtime_error( "rx xport timed out getting a response from mgmt_portal"); } recv_io->release_recv_buff(std::move(buff)); // Finally, let the management portal know the setup is complete const bool fc_enabled = (fc_freq.bytes != 0) || (fc_freq.packets != 0); mgmt_portal.config_local_rx_stream_commit( *ctrl_xport, remote_epid, 0.2 /*default timeout*/, fc_enabled); // The flow control frequency requested is contained in the strc UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", "Received strc init with fc freq" << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); fc_params_t fc_params; fc_params.buff_capacity = recv_capacity; fc_params.freq = {strc.num_bytes, static_cast(strc.num_pkts)}; recv_io.reset(); ctrl_xport.reset(); return fc_params; }