aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/chdr_rx_data_xport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc/chdr_rx_data_xport.cpp')
-rw-r--r--host/lib/rfnoc/chdr_rx_data_xport.cpp203
1 files changed, 203 insertions, 0 deletions
diff --git a/host/lib/rfnoc/chdr_rx_data_xport.cpp b/host/lib/rfnoc/chdr_rx_data_xport.cpp
new file mode 100644
index 000000000..bcd9f7ea9
--- /dev/null
+++ b/host/lib/rfnoc/chdr_rx_data_xport.cpp
@@ -0,0 +1,203 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp>
+#include <uhdlib/rfnoc/chdr_types.hpp>
+#include <uhdlib/rfnoc/mgmt_portal.hpp>
+#include <uhdlib/rfnoc/rfnoc_common.hpp>
+#include <uhdlib/transport/io_service.hpp>
+#include <uhdlib/transport/link_if.hpp>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+using namespace uhd::rfnoc::detail;
+using namespace uhd::transport;
+
+rx_flow_ctrl_sender::rx_flow_ctrl_sender(
+ const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids)
+ : _dst_epid(sep_ids.first)
+{
+ _fc_packet = pkt_factory.make_strs();
+ _fc_strs_pyld.src_epid = sep_ids.second;
+}
+
+void rx_flow_ctrl_sender::set_capacity(const stream_buff_params_t& recv_capacity)
+{
+ _fc_strs_pyld.capacity_bytes = recv_capacity.bytes;
+ _fc_strs_pyld.capacity_pkts = recv_capacity.packets;
+}
+
+chdr_rx_data_xport::chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv,
+ uhd::transport::recv_link_if::sptr recv_link,
+ uhd::transport::send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ const uhd::rfnoc::sep_id_pair_t& epids,
+ const size_t num_recv_frames,
+ const fc_params_t& fc_params)
+ : _fc_state(epids, fc_params.freq)
+ , _fc_sender(pkt_factory, epids)
+ , _epid(epids.second)
+{
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Creating rx xport with local epid=" << epids.second
+ << ", remote epid=" << epids.first);
+
+ _recv_packet = pkt_factory.make_generic();
+ _fc_sender.set_capacity(fc_params.buff_capacity);
+
+ // Calculate max payload size
+ const size_t pyld_offset =
+ _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS);
+ _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset;
+
+ // Make data transport
+ auto recv_cb =
+ [this](buff_t::uptr& buff, recv_link_if* recv_link, send_link_if* send_link) {
+ return this->_recv_callback(buff, recv_link, send_link);
+ };
+
+ auto fc_cb =
+ [this](buff_t::uptr buff, recv_link_if* recv_link, send_link_if* send_link) {
+ this->_fc_callback(std::move(buff), recv_link, send_link);
+ };
+
+ // Needs just a single send frame for responses
+ _recv_io = io_srv->make_recv_client(recv_link,
+ num_recv_frames,
+ recv_cb,
+ send_link,
+ /* num_send_frames*/ 1,
+ fc_cb);
+
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Stream endpoint was configured with:"
+ << std::endl
+ << "capacity bytes=" << fc_params.buff_capacity.bytes
+ << ", packets=" << fc_params.buff_capacity.packets << std::endl
+ << "fc frequency bytes=" << fc_params.freq.bytes
+ << ", packets=" << fc_params.freq.packets);
+}
+
+chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sptr io_srv,
+ recv_link_if::sptr recv_link,
+ send_link_if::sptr send_link,
+ const chdr::chdr_packet_factory& pkt_factory,
+ mgmt::mgmt_portal& mgmt_portal,
+ const sep_id_pair_t& epids,
+ const sw_buff_t pyld_buff_fmt,
+ const sw_buff_t mdata_buff_fmt,
+ const stream_buff_params_t& recv_capacity,
+ const stream_buff_params_t& fc_freq,
+ const stream_buff_params_t& fc_headroom,
+ const bool lossy_xport)
+{
+ const sep_id_t remote_epid = epids.first;
+ const sep_id_t local_epid = epids.second;
+
+ rx_flow_ctrl_sender fc_sender(pkt_factory, epids);
+ chdr::chdr_packet::uptr pkt = pkt_factory.make_generic();
+ fc_sender.set_capacity(recv_capacity);
+ chdr::strc_payload strc;
+
+ auto recv_cb = [&pkt, local_epid, &strc](buff_t::uptr& buff,
+ recv_link_if* /*recv_link*/,
+ send_link_if* /*send_link*/) {
+ pkt->refresh(buff->data());
+ const auto header = pkt->get_chdr_header();
+ const auto dst_epid = header.get_dst_epid();
+
+ if (dst_epid != local_epid) {
+ return false;
+ }
+
+ const auto type = header.get_pkt_type();
+
+ if (type != chdr::PKT_TYPE_STRC) {
+ return false;
+ }
+
+ strc.deserialize(pkt->get_payload_const_ptr_as<uint64_t>(),
+ pkt->get_payload_size() / sizeof(uint64_t),
+ pkt->conv_to_host<uint64_t>());
+
+ if (strc.op_code != chdr::STRC_INIT) {
+ throw uhd::value_error("Unexpected opcode value in STRC packet.");
+ }
+
+ return true;
+ };
+
+ auto fc_cb = [&fc_sender](buff_t::uptr buff,
+ recv_link_if* recv_link,
+ send_link_if* send_link) {
+ recv_link->release_recv_buff(std::move(buff));
+
+ // Send a strs response to configure flow control on the sender. The
+ // byte and packet counts are not important since they are reset by
+ // the stream endpoint on receipt of this packet.
+ fc_sender.send_strs(send_link, {0, 0});
+ };
+
+ // Create a temporary recv_io to receive the strc init
+ auto recv_io = io_srv->make_recv_client(recv_link,
+ /* num_recv_frames*/ 1,
+ recv_cb,
+ send_link,
+ /* num_send_frames*/ 1,
+ fc_cb);
+
+ // Create a control transport with the rx data links to send mgmt packets
+ // needed to setup the stream
+ // Piggyback on frames from the recv_io_if
+ auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv,
+ send_link,
+ recv_link,
+ pkt_factory,
+ local_epid,
+ 0, // num_send_frames
+ 0); // num_recv_frames
+
+ // Setup a route to the EPID
+ // Note that this may be gratuitous--The endpoint may already have been set up
+ mgmt_portal.setup_local_route(*ctrl_xport, remote_epid);
+
+ // Initialize flow control - first, the management portal sends a stream
+ // command containing its requested flow control frequency
+ mgmt_portal.config_local_rx_stream_start(*ctrl_xport,
+ remote_epid,
+ lossy_xport,
+ pyld_buff_fmt,
+ mdata_buff_fmt,
+ fc_freq,
+ fc_headroom);
+
+ // Now, release the buffer. In the flow control callback for the recv_io
+ // (fc_cb above), we send a stream status containing the xport buffer
+ // capacity.
+ auto buff = recv_io->get_recv_buff(100);
+ if (!buff) {
+ throw uhd::runtime_error(
+ "rx xport timed out getting a response from mgmt_portal");
+ }
+ recv_io->release_recv_buff(std::move(buff));
+
+ // Finally, let the management portal know the setup is complete
+ mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid);
+
+ // The flow control frequency requested is contained in the strc
+ UHD_LOG_TRACE("XPORT::RX_DATA_XPORT",
+ "Received strc init with fc freq"
+ << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts);
+
+ fc_params_t fc_params;
+ fc_params.buff_capacity = recv_capacity;
+ fc_params.freq = {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)};
+
+ recv_io.reset();
+ ctrl_xport.reset();
+
+ return fc_params;
+}