From 0e2464ad888230054b04a4f3fb192ea8dc5721b0 Mon Sep 17 00:00:00 2001 From: Ciro Nishiguchi Date: Tue, 27 Aug 2019 16:08:05 -0500 Subject: rfnoc: Move data xport sep configuration to static methods Move the configuration logic for stream endpoints to static methods of the chdr data transports. This separates those interactions from the main transport code, simplifying both. It also makes it easier to use the transports with mock link objects. --- host/lib/rfnoc/CMakeLists.txt | 2 + host/lib/rfnoc/chdr_ctrl_xport.cpp | 19 +-- host/lib/rfnoc/chdr_rx_data_xport.cpp | 203 +++++++++++++++++++++++++++ host/lib/rfnoc/chdr_tx_data_xport.cpp | 249 ++++++++++++++++++++++++++++++++++ 4 files changed, 464 insertions(+), 9 deletions(-) create mode 100644 host/lib/rfnoc/chdr_rx_data_xport.cpp create mode 100644 host/lib/rfnoc/chdr_tx_data_xport.cpp (limited to 'host/lib/rfnoc') diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index 73de394e3..2892e0d6d 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -23,6 +23,8 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/chdr_types.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr_packet.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr_ctrl_xport.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_rx_data_xport.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_tx_data_xport.cpp ${CMAKE_CURRENT_SOURCE_DIR}/client_zero.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ctrl_iface.cpp ${CMAKE_CURRENT_SOURCE_DIR}/device_id.cpp diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp index a6cfda0de..929875dbd 100644 --- a/host/lib/rfnoc/chdr_ctrl_xport.cpp +++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp @@ -16,10 +16,11 @@ using namespace uhd::transport; chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, send_link_if::sptr send_link, recv_link_if::sptr recv_link, + const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, size_t num_recv_frames) - : _my_epid(my_epid) + : _my_epid(my_epid), _recv_packet(pkt_factory.make_generic()) { /* Make dumb send pipe */ send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff, @@ -34,10 +35,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, recv_link_if* /*recv_link*/, send_link_if * /*send_link*/) -> bool { - uint64_t* data = (uint64_t*)buff->data(); - auto hdr = chdr_header(uhd::ntohx(*data)); - auto pkt_type = hdr.get_pkt_type(); - auto dst_epid = hdr.get_dst_epid(); + _recv_packet->refresh(buff->data()); + auto hdr = _recv_packet->get_chdr_header(); + auto pkt_type = hdr.get_pkt_type(); + auto dst_epid = hdr.get_dst_epid(); /* Check type and destination EPID */ if ((pkt_type == PKT_TYPE_CTRL) && (dst_epid == _my_epid)) { @@ -59,10 +60,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, recv_link_if* /*recv_link*/, send_link_if * /*send_link*/) -> bool { - uint64_t* data = (uint64_t*)buff->data(); - auto hdr = chdr_header(uhd::ntohx(*data)); - auto pkt_type = hdr.get_pkt_type(); - auto dst_epid = hdr.get_dst_epid(); + _recv_packet->refresh(buff->data()); + auto hdr = _recv_packet->get_chdr_header(); + auto pkt_type = hdr.get_pkt_type(); + auto dst_epid = hdr.get_dst_epid(); /* Check type and destination EPID */ if ((pkt_type == PKT_TYPE_MGMT) && (dst_epid == _my_epid)) { diff --git a/host/lib/rfnoc/chdr_rx_data_xport.cpp b/host/lib/rfnoc/chdr_rx_data_xport.cpp new file mode 100644 index 000000000..bcd9f7ea9 --- /dev/null +++ b/host/lib/rfnoc/chdr_rx_data_xport.cpp @@ -0,0 +1,203 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include +#include +#include +#include +#include +#include + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::detail; +using namespace uhd::transport; + +rx_flow_ctrl_sender::rx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.first) +{ + _fc_packet = pkt_factory.make_strs(); + _fc_strs_pyld.src_epid = sep_ids.second; +} + +void rx_flow_ctrl_sender::set_capacity(const stream_buff_params_t& recv_capacity) +{ + _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; + _fc_strs_pyld.capacity_pkts = recv_capacity.packets; +} + +chdr_rx_data_xport::chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_recv_frames, + const fc_params_t& fc_params) + : _fc_state(epids, fc_params.freq) + , _fc_sender(pkt_factory, epids) + , _epid(epids.second) +{ + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Creating rx xport with local epid=" << epids.second + << ", remote epid=" << epids.first); + + _recv_packet = pkt_factory.make_generic(); + _fc_sender.set_capacity(fc_params.buff_capacity); + + // Calculate max payload size + const size_t pyld_offset = + _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; + + // Make data transport + auto recv_cb = + [this](buff_t::uptr& buff, recv_link_if* recv_link, send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + auto fc_cb = + [this](buff_t::uptr buff, recv_link_if* recv_link, send_link_if* send_link) { + this->_fc_callback(std::move(buff), recv_link, send_link); + }; + + // Needs just a single send frame for responses + _recv_io = io_srv->make_recv_client(recv_link, + num_recv_frames, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Stream endpoint was configured with:" + << std::endl + << "capacity bytes=" << fc_params.buff_capacity.bytes + << ", packets=" << fc_params.buff_capacity.packets << std::endl + << "fc frequency bytes=" << fc_params.freq.bytes + << ", packets=" << fc_params.freq.packets); +} + +chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + mgmt::mgmt_portal& mgmt_portal, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const stream_buff_params_t& recv_capacity, + const stream_buff_params_t& fc_freq, + const stream_buff_params_t& fc_headroom, + const bool lossy_xport) +{ + const sep_id_t remote_epid = epids.first; + const sep_id_t local_epid = epids.second; + + rx_flow_ctrl_sender fc_sender(pkt_factory, epids); + chdr::chdr_packet::uptr pkt = pkt_factory.make_generic(); + fc_sender.set_capacity(recv_capacity); + chdr::strc_payload strc; + + auto recv_cb = [&pkt, local_epid, &strc](buff_t::uptr& buff, + recv_link_if* /*recv_link*/, + send_link_if* /*send_link*/) { + pkt->refresh(buff->data()); + const auto header = pkt->get_chdr_header(); + const auto dst_epid = header.get_dst_epid(); + + if (dst_epid != local_epid) { + return false; + } + + const auto type = header.get_pkt_type(); + + if (type != chdr::PKT_TYPE_STRC) { + return false; + } + + strc.deserialize(pkt->get_payload_const_ptr_as(), + pkt->get_payload_size() / sizeof(uint64_t), + pkt->conv_to_host()); + + if (strc.op_code != chdr::STRC_INIT) { + throw uhd::value_error("Unexpected opcode value in STRC packet."); + } + + return true; + }; + + auto fc_cb = [&fc_sender](buff_t::uptr buff, + recv_link_if* recv_link, + send_link_if* send_link) { + recv_link->release_recv_buff(std::move(buff)); + + // Send a strs response to configure flow control on the sender. The + // byte and packet counts are not important since they are reset by + // the stream endpoint on receipt of this packet. + fc_sender.send_strs(send_link, {0, 0}); + }; + + // Create a temporary recv_io to receive the strc init + auto recv_io = io_srv->make_recv_client(recv_link, + /* num_recv_frames*/ 1, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + // Create a control transport with the rx data links to send mgmt packets + // needed to setup the stream + // Piggyback on frames from the recv_io_if + auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + pkt_factory, + local_epid, + 0, // num_send_frames + 0); // num_recv_frames + + // Setup a route to the EPID + // Note that this may be gratuitous--The endpoint may already have been set up + mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); + + // Initialize flow control - first, the management portal sends a stream + // command containing its requested flow control frequency + mgmt_portal.config_local_rx_stream_start(*ctrl_xport, + remote_epid, + lossy_xport, + pyld_buff_fmt, + mdata_buff_fmt, + fc_freq, + fc_headroom); + + // Now, release the buffer. In the flow control callback for the recv_io + // (fc_cb above), we send a stream status containing the xport buffer + // capacity. + auto buff = recv_io->get_recv_buff(100); + if (!buff) { + throw uhd::runtime_error( + "rx xport timed out getting a response from mgmt_portal"); + } + recv_io->release_recv_buff(std::move(buff)); + + // Finally, let the management portal know the setup is complete + mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid); + + // The flow control frequency requested is contained in the strc + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Received strc init with fc freq" + << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); + + fc_params_t fc_params; + fc_params.buff_capacity = recv_capacity; + fc_params.freq = {strc.num_bytes, static_cast(strc.num_pkts)}; + + recv_io.reset(); + ctrl_xport.reset(); + + return fc_params; +} diff --git a/host/lib/rfnoc/chdr_tx_data_xport.cpp b/host/lib/rfnoc/chdr_tx_data_xport.cpp new file mode 100644 index 000000000..cb28c7ac9 --- /dev/null +++ b/host/lib/rfnoc/chdr_tx_data_xport.cpp @@ -0,0 +1,249 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include +#include +#include +#include +#include +#include + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::detail; +using namespace uhd::transport; + +tx_flow_ctrl_sender::tx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.second) +{ + _fc_packet = pkt_factory.make_strc(); + _fc_strc_pyld.src_epid = sep_ids.first; + _fc_strc_pyld.op_code = chdr::STRC_RESYNC; +} + +chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_send_frames, + const fc_params_t fc_params) + : _fc_state(fc_params.buff_capacity) + , _fc_sender(pkt_factory, epids) + , _epid(epids.first) +{ + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Creating tx xport with local epid=" << epids.first + << ", remote epid=" << epids.second); + + _send_header.set_dst_epid(epids.second); + _send_packet = pkt_factory.make_generic(); + _recv_packet = pkt_factory.make_generic(); + + // Calculate max payload size + const size_t pyld_offset = + _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = send_link->get_send_frame_size() - pyld_offset; + + // Now create the send I/O we will use for data + auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { + this->_send_callback(buff, send_link); + }; + + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + // Needs just a single recv frame for strs packets + _send_io = io_srv->make_send_client(send_link, + num_send_frames, + send_cb, + recv_link, + /* num_recv_frames */ 1, + recv_cb); +} + +/* + * To configure flow control, we need to send an init strc packet, then + * receive a strs containing the stream endpoint ingress buffer size. We + * then repeat this (now that we know the buffer size) to configure the flow + * control frequency. To avoid having this logic within the data packet + * processing flow, we use temporary send and recv I/O instances with + * simple callbacks here. + */ +static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const sep_id_pair_t epids, + const double fc_freq_ratio, + const double fc_headroom_ratio) +{ + chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); + chdr::chdr_packet::uptr recv_packet = pkt_factory.make_generic(); + + // No flow control at initialization, just release all send buffs + auto send_cb = [](frame_buff::uptr& buff, send_link_if* send_link) { + send_link->release_send_buff(std::move(buff)); + buff = nullptr; + }; + + // For recv, just queue strs packets for recv_io to read + auto recv_cb = [&recv_packet, epids](frame_buff::uptr& buff, + recv_link_if* /*recv_link*/, + send_link_if* /*send_link*/) { + recv_packet->refresh(buff->data()); + const auto header = recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + + return (dst_epid == epids.first && type == chdr::PKT_TYPE_STRS); + }; + + // No flow control at initialization, just release all recv buffs + auto fc_cb = + [](frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* /*send_link*/) { + recv_link->release_recv_buff(std::move(buff)); + }; + + auto send_io = io_srv->make_send_client(send_link, + 1, // num_send_frames + send_cb, + nullptr, + 0, // num_recv_frames + nullptr); + + auto recv_io = io_srv->make_recv_client(recv_link, + 1, // num_recv_frames + recv_cb, + nullptr, + 0, // num_send_frames + fc_cb); + + // Function to send a strc init + auto send_strc_init = [&send_io, epids, &strc_packet]( + const stream_buff_params_t fc_freq = {0, 0}) { + frame_buff::uptr buff = send_io->get_send_buff(0); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out getting a send buffer for strc init"); + } + + chdr::chdr_header header; + header.set_seq_num(0); + header.set_dst_epid(epids.second); + + chdr::strc_payload strc_pyld; + strc_pyld.src_epid = epids.first; + strc_pyld.op_code = chdr::STRC_INIT; + strc_pyld.num_bytes = fc_freq.bytes; + strc_pyld.num_pkts = fc_freq.packets; + strc_packet->refresh(buff->data(), header, strc_pyld); + + const size_t size = header.get_length(); + buff->set_packet_size(size); + send_io->release_send_buff(std::move(buff)); + }; + + // Function to receive a strs, returns buffer capacity + auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { + frame_buff::uptr buff = recv_io->get_recv_buff(200); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out wating for a strs packet during initialization"); + } + + recv_packet->refresh(buff->data()); + UHD_ASSERT_THROW( + recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); + chdr::strs_payload strs; + strs.deserialize(recv_packet->get_payload_const_ptr_as(), + recv_packet->get_payload_size() / sizeof(uint64_t), + recv_packet->conv_to_host()); + + recv_io->release_recv_buff(std::move(buff)); + + return {strs.capacity_bytes, static_cast(strs.capacity_pkts)}; + }; + + // Send a strc init to get the buffer size + send_strc_init(); + stream_buff_params_t capacity = recv_strs(); + + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Received strs initializing buffer capacity to " << capacity.bytes << " bytes"); + + // Calculate the requested fc_freq parameters + stream_buff_params_t fc_freq = { + static_cast(std::ceil(double(capacity.bytes) * fc_freq_ratio)), + static_cast(std::ceil(double(capacity.packets) * fc_freq_ratio))}; + + const size_t headroom_bytes = + static_cast(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); + const size_t headroom_packets = + static_cast(std::ceil(double(capacity.packets) * fc_headroom_ratio)); + + fc_freq.bytes -= headroom_bytes; + fc_freq.packets -= headroom_packets; + + // Send a strc init to configure fc freq + send_strc_init(fc_freq); + recv_strs(); + + // Release temporary I/O service interfaces to disconnect from it + send_io.reset(); + recv_io.reset(); + + return {capacity}; +} + +chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + mgmt::mgmt_portal& mgmt_portal, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const double fc_freq_ratio, + const double fc_headroom_ratio) +{ + const sep_id_t remote_epid = epids.second; + const sep_id_t local_epid = epids.first; + + // Create a control transport with the tx data links to send mgmt packets + // needed to setup the stream. Only need one frame for this. + auto ctrl_xport = chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + pkt_factory, + local_epid, + 1, // num_send_frames + 1); // num_recv_frames + + // Setup a route to the EPID + mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); + + mgmt_portal.config_local_tx_stream( + *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); + + // We no longer need the control xport, release it so + // the control xport is no longer connected to the I/O service. + ctrl_xport.reset(); + + return configure_flow_ctrl(io_srv, + recv_link, + send_link, + pkt_factory, + epids, + fc_freq_ratio, + fc_headroom_ratio); +} -- cgit v1.2.3