diff options
Diffstat (limited to 'host/lib/rfnoc')
-rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_ctrl_xport.cpp | 19 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_rx_data_xport.cpp | 203 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_tx_data_xport.cpp | 249 |
4 files changed, 464 insertions, 9 deletions
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index 73de394e3..2892e0d6d 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -23,6 +23,8 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/chdr_types.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr_packet.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr_ctrl_xport.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_rx_data_xport.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_tx_data_xport.cpp ${CMAKE_CURRENT_SOURCE_DIR}/client_zero.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ctrl_iface.cpp ${CMAKE_CURRENT_SOURCE_DIR}/device_id.cpp diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp index a6cfda0de..929875dbd 100644 --- a/host/lib/rfnoc/chdr_ctrl_xport.cpp +++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp @@ -16,10 +16,11 @@ using namespace uhd::transport; chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, send_link_if::sptr send_link, recv_link_if::sptr recv_link, + const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, size_t num_recv_frames) - : _my_epid(my_epid) + : _my_epid(my_epid), _recv_packet(pkt_factory.make_generic()) { /* Make dumb send pipe */ send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff, @@ -34,10 +35,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, recv_link_if* /*recv_link*/, send_link_if * /*send_link*/) -> bool { - uint64_t* data = (uint64_t*)buff->data(); - auto hdr = chdr_header(uhd::ntohx<uint64_t>(*data)); - auto pkt_type = hdr.get_pkt_type(); - auto dst_epid = hdr.get_dst_epid(); + _recv_packet->refresh(buff->data()); + auto hdr = _recv_packet->get_chdr_header(); + auto pkt_type = hdr.get_pkt_type(); + auto dst_epid = hdr.get_dst_epid(); /* Check type and destination EPID */ if ((pkt_type == PKT_TYPE_CTRL) && (dst_epid == _my_epid)) { @@ -59,10 +60,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, recv_link_if* /*recv_link*/, send_link_if * /*send_link*/) -> bool { - uint64_t* data = (uint64_t*)buff->data(); - auto hdr = chdr_header(uhd::ntohx<uint64_t>(*data)); - auto pkt_type = hdr.get_pkt_type(); - auto dst_epid = hdr.get_dst_epid(); + _recv_packet->refresh(buff->data()); + auto hdr = _recv_packet->get_chdr_header(); + auto pkt_type = hdr.get_pkt_type(); + auto dst_epid = hdr.get_dst_epid(); /* Check type and destination EPID */ if ((pkt_type == PKT_TYPE_MGMT) && (dst_epid == _my_epid)) { diff --git a/host/lib/rfnoc/chdr_rx_data_xport.cpp b/host/lib/rfnoc/chdr_rx_data_xport.cpp new file mode 100644 index 000000000..bcd9f7ea9 --- /dev/null +++ b/host/lib/rfnoc/chdr_rx_data_xport.cpp @@ -0,0 +1,203 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::detail; +using namespace uhd::transport; + +rx_flow_ctrl_sender::rx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.first) +{ + _fc_packet = pkt_factory.make_strs(); + _fc_strs_pyld.src_epid = sep_ids.second; +} + +void rx_flow_ctrl_sender::set_capacity(const stream_buff_params_t& recv_capacity) +{ + _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; + _fc_strs_pyld.capacity_pkts = recv_capacity.packets; +} + +chdr_rx_data_xport::chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_recv_frames, + const fc_params_t& fc_params) + : _fc_state(epids, fc_params.freq) + , _fc_sender(pkt_factory, epids) + , _epid(epids.second) +{ + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Creating rx xport with local epid=" << epids.second + << ", remote epid=" << epids.first); + + _recv_packet = pkt_factory.make_generic(); + _fc_sender.set_capacity(fc_params.buff_capacity); + + // Calculate max payload size + const size_t pyld_offset = + _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; + + // Make data transport + auto recv_cb = + [this](buff_t::uptr& buff, recv_link_if* recv_link, send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + auto fc_cb = + [this](buff_t::uptr buff, recv_link_if* recv_link, send_link_if* send_link) { + this->_fc_callback(std::move(buff), recv_link, send_link); + }; + + // Needs just a single send frame for responses + _recv_io = io_srv->make_recv_client(recv_link, + num_recv_frames, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Stream endpoint was configured with:" + << std::endl + << "capacity bytes=" << fc_params.buff_capacity.bytes + << ", packets=" << fc_params.buff_capacity.packets << std::endl + << "fc frequency bytes=" << fc_params.freq.bytes + << ", packets=" << fc_params.freq.packets); +} + +chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + mgmt::mgmt_portal& mgmt_portal, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const stream_buff_params_t& recv_capacity, + const stream_buff_params_t& fc_freq, + const stream_buff_params_t& fc_headroom, + const bool lossy_xport) +{ + const sep_id_t remote_epid = epids.first; + const sep_id_t local_epid = epids.second; + + rx_flow_ctrl_sender fc_sender(pkt_factory, epids); + chdr::chdr_packet::uptr pkt = pkt_factory.make_generic(); + fc_sender.set_capacity(recv_capacity); + chdr::strc_payload strc; + + auto recv_cb = [&pkt, local_epid, &strc](buff_t::uptr& buff, + recv_link_if* /*recv_link*/, + send_link_if* /*send_link*/) { + pkt->refresh(buff->data()); + const auto header = pkt->get_chdr_header(); + const auto dst_epid = header.get_dst_epid(); + + if (dst_epid != local_epid) { + return false; + } + + const auto type = header.get_pkt_type(); + + if (type != chdr::PKT_TYPE_STRC) { + return false; + } + + strc.deserialize(pkt->get_payload_const_ptr_as<uint64_t>(), + pkt->get_payload_size() / sizeof(uint64_t), + pkt->conv_to_host<uint64_t>()); + + if (strc.op_code != chdr::STRC_INIT) { + throw uhd::value_error("Unexpected opcode value in STRC packet."); + } + + return true; + }; + + auto fc_cb = [&fc_sender](buff_t::uptr buff, + recv_link_if* recv_link, + send_link_if* send_link) { + recv_link->release_recv_buff(std::move(buff)); + + // Send a strs response to configure flow control on the sender. The + // byte and packet counts are not important since they are reset by + // the stream endpoint on receipt of this packet. + fc_sender.send_strs(send_link, {0, 0}); + }; + + // Create a temporary recv_io to receive the strc init + auto recv_io = io_srv->make_recv_client(recv_link, + /* num_recv_frames*/ 1, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + // Create a control transport with the rx data links to send mgmt packets + // needed to setup the stream + // Piggyback on frames from the recv_io_if + auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + pkt_factory, + local_epid, + 0, // num_send_frames + 0); // num_recv_frames + + // Setup a route to the EPID + // Note that this may be gratuitous--The endpoint may already have been set up + mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); + + // Initialize flow control - first, the management portal sends a stream + // command containing its requested flow control frequency + mgmt_portal.config_local_rx_stream_start(*ctrl_xport, + remote_epid, + lossy_xport, + pyld_buff_fmt, + mdata_buff_fmt, + fc_freq, + fc_headroom); + + // Now, release the buffer. In the flow control callback for the recv_io + // (fc_cb above), we send a stream status containing the xport buffer + // capacity. + auto buff = recv_io->get_recv_buff(100); + if (!buff) { + throw uhd::runtime_error( + "rx xport timed out getting a response from mgmt_portal"); + } + recv_io->release_recv_buff(std::move(buff)); + + // Finally, let the management portal know the setup is complete + mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid); + + // The flow control frequency requested is contained in the strc + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Received strc init with fc freq" + << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); + + fc_params_t fc_params; + fc_params.buff_capacity = recv_capacity; + fc_params.freq = {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}; + + recv_io.reset(); + ctrl_xport.reset(); + + return fc_params; +} diff --git a/host/lib/rfnoc/chdr_tx_data_xport.cpp b/host/lib/rfnoc/chdr_tx_data_xport.cpp new file mode 100644 index 000000000..cb28c7ac9 --- /dev/null +++ b/host/lib/rfnoc/chdr_tx_data_xport.cpp @@ -0,0 +1,249 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::detail; +using namespace uhd::transport; + +tx_flow_ctrl_sender::tx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.second) +{ + _fc_packet = pkt_factory.make_strc(); + _fc_strc_pyld.src_epid = sep_ids.first; + _fc_strc_pyld.op_code = chdr::STRC_RESYNC; +} + +chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_send_frames, + const fc_params_t fc_params) + : _fc_state(fc_params.buff_capacity) + , _fc_sender(pkt_factory, epids) + , _epid(epids.first) +{ + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Creating tx xport with local epid=" << epids.first + << ", remote epid=" << epids.second); + + _send_header.set_dst_epid(epids.second); + _send_packet = pkt_factory.make_generic(); + _recv_packet = pkt_factory.make_generic(); + + // Calculate max payload size + const size_t pyld_offset = + _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = send_link->get_send_frame_size() - pyld_offset; + + // Now create the send I/O we will use for data + auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { + this->_send_callback(buff, send_link); + }; + + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + // Needs just a single recv frame for strs packets + _send_io = io_srv->make_send_client(send_link, + num_send_frames, + send_cb, + recv_link, + /* num_recv_frames */ 1, + recv_cb); +} + +/* + * To configure flow control, we need to send an init strc packet, then + * receive a strs containing the stream endpoint ingress buffer size. We + * then repeat this (now that we know the buffer size) to configure the flow + * control frequency. To avoid having this logic within the data packet + * processing flow, we use temporary send and recv I/O instances with + * simple callbacks here. + */ +static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const sep_id_pair_t epids, + const double fc_freq_ratio, + const double fc_headroom_ratio) +{ + chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); + chdr::chdr_packet::uptr recv_packet = pkt_factory.make_generic(); + + // No flow control at initialization, just release all send buffs + auto send_cb = [](frame_buff::uptr& buff, send_link_if* send_link) { + send_link->release_send_buff(std::move(buff)); + buff = nullptr; + }; + + // For recv, just queue strs packets for recv_io to read + auto recv_cb = [&recv_packet, epids](frame_buff::uptr& buff, + recv_link_if* /*recv_link*/, + send_link_if* /*send_link*/) { + recv_packet->refresh(buff->data()); + const auto header = recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + + return (dst_epid == epids.first && type == chdr::PKT_TYPE_STRS); + }; + + // No flow control at initialization, just release all recv buffs + auto fc_cb = + [](frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* /*send_link*/) { + recv_link->release_recv_buff(std::move(buff)); + }; + + auto send_io = io_srv->make_send_client(send_link, + 1, // num_send_frames + send_cb, + nullptr, + 0, // num_recv_frames + nullptr); + + auto recv_io = io_srv->make_recv_client(recv_link, + 1, // num_recv_frames + recv_cb, + nullptr, + 0, // num_send_frames + fc_cb); + + // Function to send a strc init + auto send_strc_init = [&send_io, epids, &strc_packet]( + const stream_buff_params_t fc_freq = {0, 0}) { + frame_buff::uptr buff = send_io->get_send_buff(0); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out getting a send buffer for strc init"); + } + + chdr::chdr_header header; + header.set_seq_num(0); + header.set_dst_epid(epids.second); + + chdr::strc_payload strc_pyld; + strc_pyld.src_epid = epids.first; + strc_pyld.op_code = chdr::STRC_INIT; + strc_pyld.num_bytes = fc_freq.bytes; + strc_pyld.num_pkts = fc_freq.packets; + strc_packet->refresh(buff->data(), header, strc_pyld); + + const size_t size = header.get_length(); + buff->set_packet_size(size); + send_io->release_send_buff(std::move(buff)); + }; + + // Function to receive a strs, returns buffer capacity + auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { + frame_buff::uptr buff = recv_io->get_recv_buff(200); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out wating for a strs packet during initialization"); + } + + recv_packet->refresh(buff->data()); + UHD_ASSERT_THROW( + recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); + chdr::strs_payload strs; + strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(), + recv_packet->get_payload_size() / sizeof(uint64_t), + recv_packet->conv_to_host<uint64_t>()); + + recv_io->release_recv_buff(std::move(buff)); + + return {strs.capacity_bytes, static_cast<uint32_t>(strs.capacity_pkts)}; + }; + + // Send a strc init to get the buffer size + send_strc_init(); + stream_buff_params_t capacity = recv_strs(); + + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Received strs initializing buffer capacity to " << capacity.bytes << " bytes"); + + // Calculate the requested fc_freq parameters + stream_buff_params_t fc_freq = { + static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)), + static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_freq_ratio))}; + + const size_t headroom_bytes = + static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); + const size_t headroom_packets = + static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_headroom_ratio)); + + fc_freq.bytes -= headroom_bytes; + fc_freq.packets -= headroom_packets; + + // Send a strc init to configure fc freq + send_strc_init(fc_freq); + recv_strs(); + + // Release temporary I/O service interfaces to disconnect from it + send_io.reset(); + recv_io.reset(); + + return {capacity}; +} + +chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + mgmt::mgmt_portal& mgmt_portal, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const double fc_freq_ratio, + const double fc_headroom_ratio) +{ + const sep_id_t remote_epid = epids.second; + const sep_id_t local_epid = epids.first; + + // Create a control transport with the tx data links to send mgmt packets + // needed to setup the stream. Only need one frame for this. + auto ctrl_xport = chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + pkt_factory, + local_epid, + 1, // num_send_frames + 1); // num_recv_frames + + // Setup a route to the EPID + mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); + + mgmt_portal.config_local_tx_stream( + *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); + + // We no longer need the control xport, release it so + // the control xport is no longer connected to the I/O service. + ctrl_xport.reset(); + + return configure_flow_ctrl(io_srv, + recv_link, + send_link, + pkt_factory, + epids, + fc_freq_ratio, + fc_headroom_ratio); +} |