From 0e2464ad888230054b04a4f3fb192ea8dc5721b0 Mon Sep 17 00:00:00 2001 From: Ciro Nishiguchi Date: Tue, 27 Aug 2019 16:08:05 -0500 Subject: rfnoc: Move data xport sep configuration to static methods Move the configuration logic for stream endpoints to static methods of the chdr data transports. This separates those interactions from the main transport code, simplifying both. It also makes it easier to use the transports with mock link objects. --- host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp | 17 +- .../include/uhdlib/rfnoc/chdr_rx_data_xport.hpp | 175 ++++------- .../include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 323 ++++----------------- .../include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp | 17 +- .../include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp | 9 +- host/lib/rfnoc/CMakeLists.txt | 2 + host/lib/rfnoc/chdr_ctrl_xport.cpp | 19 +- host/lib/rfnoc/chdr_rx_data_xport.cpp | 203 +++++++++++++ host/lib/rfnoc/chdr_tx_data_xport.cpp | 249 ++++++++++++++++ 9 files changed, 597 insertions(+), 417 deletions(-) create mode 100644 host/lib/rfnoc/chdr_rx_data_xport.cpp create mode 100644 host/lib/rfnoc/chdr_tx_data_xport.cpp diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp index 9cf2f305f..726ea7f6c 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp @@ -7,6 +7,7 @@ #ifndef INCLUDED_RFNOC_CHDR_CTRL_XPORT_HPP #define INCLUDED_RFNOC_CHDR_CTRL_XPORT_HPP +#include #include #include #include @@ -41,12 +42,18 @@ public: static sptr make(io_service::sptr io_srv, send_link_if::sptr send_link, recv_link_if::sptr recv_link, + const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, size_t num_recv_frames) { - return std::make_shared( - io_srv, send_link, recv_link, my_epid, num_send_frames, num_recv_frames); + return std::make_shared(io_srv, + send_link, + recv_link, + pkt_factory, + my_epid, + num_send_frames, + num_recv_frames); } /*! @@ -62,6 +69,7 @@ public: chdr_ctrl_xport(io_service::sptr io_srv, send_link_if::sptr send_link, recv_link_if::sptr recv_link, + const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, size_t num_recv_frames); @@ -122,10 +130,15 @@ private: chdr_ctrl_xport(const chdr_ctrl_xport&) = delete; sep_id_t _my_epid; + + // Packet for received data + chdr::chdr_packet::uptr _recv_packet; + send_io_if::sptr _send_if; recv_io_if::sptr _ctrl_recv_if; recv_io_if::sptr _mgmt_recv_if; + // FIXME: Remove this when have threaded_io_service std::mutex _mutex; }; diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp index 41af766b4..9c5d88066 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -8,9 +8,9 @@ #define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP #include +#include #include #include -#include #include #include #include @@ -19,6 +19,10 @@ namespace uhd { namespace rfnoc { +namespace mgmt { +class mgmt_portal; +} + namespace detail { /*! @@ -29,21 +33,12 @@ class rx_flow_ctrl_sender public: //! Constructor rx_flow_ctrl_sender( - const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) - : _dst_epid(sep_ids.first) - { - _fc_packet = pkt_factory.make_strs(); - _fc_strs_pyld.src_epid = sep_ids.second; - } + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids); - /*! Configure buffer capacity + /*! configure buffer capacity * \param recv_capacity The buffer capacity of the receive link */ - void set_capacity(const stream_buff_params_t& recv_capacity) - { - _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; - _fc_strs_pyld.capacity_pkts = recv_capacity.packets; - } + void set_capacity(const stream_buff_params_t& recv_capacity); /*! Send a flow control response packet * @@ -119,113 +114,59 @@ public: const void* payload = nullptr; }; - /*! Constructor + //! Flow control parameters + struct fc_params_t + { + stream_buff_params_t buff_capacity; + stream_buff_params_t freq; + }; + + /*! Configure stream endpoint route and flow control * * \param io_srv The service that will schedule the xport I/O * \param recv_link The recv link, already attached to the I/O service * \param send_link The send link, already attached to the I/O service * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param mgmt_portal Management portal to configure stream endpoint * \param epids Source and destination endpoint IDs * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata - * \param num_recv_frames Num frames to reserve from the recv link * \param recv_capacity Total capacity of the recv link * \param fc_freq Frequency of flow control status messages * \param fc_headroom Headroom for flow control status messages * \param lossy_xport Whether the xport is lossy, for flow control configuration + * \return Parameters for xport flow control */ - chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + static fc_params_t configure_sep(uhd::transport::io_service::sptr io_srv, uhd::transport::recv_link_if::sptr recv_link, uhd::transport::send_link_if::sptr send_link, - uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, const chdr::chdr_packet_factory& pkt_factory, + uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, const uhd::rfnoc::sep_id_pair_t& epids, const uhd::rfnoc::sw_buff_t pyld_buff_fmt, const uhd::rfnoc::sw_buff_t mdata_buff_fmt, - const size_t num_recv_frames, const stream_buff_params_t& recv_capacity, const stream_buff_params_t& fc_freq, const stream_buff_params_t& fc_headroom, - const bool lossy_xport) - : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second) - { - const sep_id_t remote_epid = epids.first; - const sep_id_t local_epid = epids.second; - - UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", - "Creating rx xport with local epid=" << local_epid - << ", remote epid=" << remote_epid); - - _recv_packet = pkt_factory.make_generic(); - _fc_sender.set_capacity(recv_capacity); - - // Calculate max payload size - const size_t pyld_offset = - _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); - _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; - - // Make data transport - auto recv_cb = [this](buff_t::uptr& buff, - transport::recv_link_if* recv_link, - transport::send_link_if* send_link) { - return this->_recv_callback(buff, recv_link, send_link); - }; - - auto fc_cb = [this](buff_t::uptr buff, - transport::recv_link_if* recv_link, - transport::send_link_if* send_link) { - this->_fc_callback(std::move(buff), recv_link, send_link); - }; - - // Needs just a single send frame for responses - _recv_io = io_srv->make_recv_client(recv_link, - num_recv_frames, - recv_cb, - send_link, - /* num_send_frames*/ 1, - fc_cb); - - // Create a control transport with the rx data links to send mgmt packets - // needed to setup the stream - // Piggyback on frames from the recv_io_if - auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, - send_link, - recv_link, - local_epid, - 0, // num_send_frames - 0); // num_recv_frames - - // Setup a route to the EPID - // Note that this may be gratuitous--The endpoint may already have been set up - mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); - - // Initialize flow control - management portal sends a stream command - // containing its requested flow control frequency, the rx transport - // responds with a stream status containing its buffer capacity. - mgmt_portal.config_local_rx_stream_start(*ctrl_xport, - remote_epid, - lossy_xport, - pyld_buff_fmt, - mdata_buff_fmt, - fc_freq, - fc_headroom); - - mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid); - - UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", - "Stream endpoint was configured with:" - << std::endl - << "capacity bytes=" << recv_capacity.bytes - << ", packets=" << recv_capacity.packets << std::endl - << "fc headroom bytes=" << fc_headroom.bytes - << ", packets=" << fc_headroom.packets << std::endl - << "fc frequency bytes=" << fc_freq.bytes - << ", packets=" << fc_freq.packets); - - // We no longer need the control xport, release it so - // the control xport is no longer connected to the I/O service. - ctrl_xport.reset(); - } + const bool lossy_xport); + + /*! Constructor + * + * \param io_srv The service that will schedule the xport I/O + * \param recv_link The recv link, already attached to the I/O service + * \param send_link The send link, already attached to the I/O service + * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param epids Source and destination endpoint IDs + * \param num_recv_frames Num frames to reserve from the recv link + * \param fc_params Parameters for flow control + */ + chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_recv_frames, + const fc_params_t& fc_params); /*! Returns maximum number payload bytes * @@ -286,15 +227,16 @@ private: transport::send_link_if* send_link) { _recv_packet->refresh(buff->data()); - const auto header = _recv_packet->get_chdr_header(); - const auto type = header.get_pkt_type(); - const auto dst_epid = header.get_dst_epid(); - const auto packet_size = buff->packet_size(); + const auto header = _recv_packet->get_chdr_header(); + const auto dst_epid = header.get_dst_epid(); if (dst_epid != _epid) { return false; } + const auto type = header.get_pkt_type(); + const auto packet_size = header.get_length(); + if (type == chdr::PKT_TYPE_STRC) { chdr::strc_payload strc; strc.deserialize(_recv_packet->get_payload_const_ptr_as(), @@ -316,26 +258,6 @@ private: buff = buff_t::uptr(); _fc_state.xfer_done(packet_size); _send_fc_response(send_link); - } else if (strc.op_code == chdr::STRC_INIT) { - _fc_state.initialize( - {strc.num_bytes, static_cast(strc.num_pkts)}); - - UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", - "Received strc init with fc freq" - << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); - - // Make sure flow control was initialized - assert(_fc_state.get_fc_freq().bytes > 0); - assert(_fc_state.get_fc_freq().packets > 0); - - // Send a strs response to configure flow control on the sender - _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); - - // Reset counts, since mgmt_portal will do it to FPGA - _fc_state.reset_counts(); - - recv_link->release_recv_buff(std::move(buff)); - buff = buff_t::uptr(); } else { throw uhd::value_error("Unexpected opcode value in STRC packet."); } @@ -373,7 +295,9 @@ private: transport::recv_link_if* recv_link, transport::send_link_if* send_link) { - const size_t packet_size = buff->packet_size(); + _recv_packet->refresh(buff->data()); + const auto header = _recv_packet->get_chdr_header(); + const size_t packet_size = header.get_length(); recv_link->release_recv_buff(std::move(buff)); _fc_state.xfer_done(packet_size); _send_fc_response(send_link); @@ -393,8 +317,8 @@ private: } /*! - * Checks if the sequence number is out of sequence, prints 'D' if it is - * and returns result of check. + * Checks if the sequence number is out of sequence, increments sequence + * number for next packet. * * \return true if a sequence error occurred */ @@ -404,7 +328,6 @@ private: _data_seq_num = seq_num + 1; if (expected_packet_count != seq_num) { - UHD_LOG_FASTPATH("D"); return true; } return false; diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 63c2b24cb..1e54a2f7a 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -7,10 +7,11 @@ #ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP #define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#include #include +#include #include #include -#include #include #include #include @@ -19,6 +20,10 @@ namespace uhd { namespace rfnoc { +namespace mgmt { +class mgmt_portal; +} + namespace detail { /*! @@ -29,13 +34,7 @@ class tx_flow_ctrl_sender public: //! Constructor tx_flow_ctrl_sender( - const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) - : _dst_epid(sep_ids.second) - { - _fc_packet = pkt_factory.make_strc(); - _fc_strc_pyld.src_epid = sep_ids.first; - _fc_strc_pyld.op_code = chdr::STRC_RESYNC; - } + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids); /*! * Sends a flow control resync packet @@ -104,9 +103,10 @@ private: class chdr_tx_data_xport { public: - using uptr = std::unique_ptr; - using buff_t = transport::frame_buff; - using enqueue_async_msg_fn_t = std::function; + using uptr = std::unique_ptr; + using buff_t = transport::frame_buff; + using enqueue_async_msg_fn_t = + std::function; //! Information about data packet struct packet_info_t @@ -117,84 +117,54 @@ public: size_t payload_bytes = 0; }; - /*! Constructor + //! Flow control parameters + struct fc_params_t + { + stream_buff_params_t buff_capacity; + }; + + /*! Configure route to the sep and flow control * - * \param io_srv The service that will schedule the xport I/O + * \param io_srv The I/O service to be used with the transport * \param recv_link The recv link, already attached to the I/O service * \param send_link The send link, already attached to the I/O service * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param mgmt_portal Management portal to configure stream endpoint * \param epids Source and destination endpoint IDs * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata - * \param num_send_frames Num frames to reserve from the send link * \param fc_freq_ratio Ratio to use to configure the device fc frequency * \param fc_headroom_ratio Ratio to use to configure the device fc headroom + * \return Parameters for xport flow control */ - chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + static fc_params_t configure_sep(uhd::transport::io_service::sptr io_srv, uhd::transport::recv_link_if::sptr recv_link, uhd::transport::send_link_if::sptr send_link, - uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, const chdr::chdr_packet_factory& pkt_factory, + uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, const uhd::rfnoc::sep_id_pair_t& epids, const uhd::rfnoc::sw_buff_t pyld_buff_fmt, const uhd::rfnoc::sw_buff_t mdata_buff_fmt, - const size_t num_send_frames, const double fc_freq_ratio, - const double fc_headroom_ratio) - : _fc_sender(pkt_factory, epids), _epid(epids.first) - { - const sep_id_t remote_epid = epids.second; - const sep_id_t local_epid = epids.first; - - UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", - "Creating tx xport with local epid=" << local_epid - << ", remote epid=" << remote_epid); - - _send_header.set_dst_epid(epids.second); - _send_packet = pkt_factory.make_generic(); - _recv_packet = pkt_factory.make_generic(); - - // Calculate max payload size - const size_t pyld_offset = - _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); - _max_payload_size = send_link->get_send_frame_size() - pyld_offset; - - _configure_sep(io_srv, - recv_link, - send_link, - mgmt_portal, - local_epid, - remote_epid, - pyld_buff_fmt, - mdata_buff_fmt); - - _initialize_flow_ctrl(io_srv, - recv_link, - send_link, - pkt_factory, - epids, - fc_freq_ratio, - fc_headroom_ratio); - - // Now create the send I/O we will use for data - auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { - this->_send_callback(buff, send_link); - }; - - auto recv_cb = [this](buff_t::uptr& buff, - transport::recv_link_if* recv_link, - transport::send_link_if* send_link) { - return this->_recv_callback(buff, recv_link, send_link); - }; - - // Needs just a single recv frame for strs packets - _send_io = io_srv->make_send_client(send_link, - num_send_frames, - send_cb, - recv_link, - /* num_recv_frames */ 1, - recv_cb); - } + const double fc_headroom_ratio); + + /*! Constructor + * + * \param io_srv The service that will schedule the xport I/O + * \param recv_link The recv link, already attached to the I/O service + * \param send_link The send link, already attached to the I/O service + * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param epids Source and destination endpoint IDs + * \param num_send_frames Num frames to reserve from the send link + * \param fc_params Parameters for flow control + */ + chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_send_frames, + const fc_params_t fc_params); /*! Returns maximum number of payload bytes * @@ -300,23 +270,27 @@ private: if (strs.status != chdr::STRS_OKAY) { switch (strs.status) { - case chdr::STRS_SEQERR: - UHD_LOG_FASTPATH("S"); - if (_enqueue_async_msg) { - _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0); - } - break; - case chdr::STRS_DATAERR: - UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!"); - break; - case chdr::STRS_RTERR: - UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!"); - break; - case chdr::STRS_CMDERR: - UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!"); - break; - default: - break; + case chdr::STRS_SEQERR: + UHD_LOG_FASTPATH("S"); + if (_enqueue_async_msg) { + _enqueue_async_msg( + async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0); + } + break; + case chdr::STRS_DATAERR: + UHD_LOG_WARNING( + "XPORT::TX_DATA_XPORT", "Received data error in tx stream!"); + break; + case chdr::STRS_RTERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", + "Received routing error in tx stream!"); + break; + case chdr::STRS_CMDERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", + "Received command error in tx stream!"); + break; + default: + break; } } @@ -359,177 +333,6 @@ private: } } - /*! - * Configures the stream endpoint using mgmt_portal - */ - void _configure_sep(uhd::transport::io_service::sptr io_srv, - uhd::transport::recv_link_if::sptr recv_link, - uhd::transport::send_link_if::sptr send_link, - uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, - const uhd::rfnoc::sep_id_t& local_epid, - const uhd::rfnoc::sep_id_t& remote_epid, - const uhd::rfnoc::sw_buff_t pyld_buff_fmt, - const uhd::rfnoc::sw_buff_t mdata_buff_fmt) - { - // Create a control transport with the tx data links to send mgmt packets - // needed to setup the stream. Only need one frame for this. - auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, - send_link, - recv_link, - local_epid, - 1, // num_send_frames - 1); // num_recv_frames - - // Setup a route to the EPID - mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); - - mgmt_portal.config_local_tx_stream( - *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); - - // We no longer need the control xport, release it so - // the control xport is no longer connected to the I/O service. - ctrl_xport.reset(); - } - - /*! - * Initializes flow control - * - * To initialize flow control, we need to send an init strc packet, then - * receive a strs containing the stream endpoint ingress buffer size. We - * then repeat this (now that we know the buffer size) to configure the flow - * control frequency. To avoid having this logic within the data packet - * processing flow, we use temporary send and recv I/O instances with - * simple callbacks here. - */ - void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv, - uhd::transport::recv_link_if::sptr recv_link, - uhd::transport::send_link_if::sptr send_link, - const chdr::chdr_packet_factory& pkt_factory, - const sep_id_pair_t sep_ids, - const double fc_freq_ratio, - const double fc_headroom_ratio) - { - // No flow control at initialization, just release all send buffs - auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { - send_link->release_send_buff(std::move(buff)); - buff = nullptr; - }; - - // For recv, just queue strs packets for recv_io to read - auto recv_cb = [this](buff_t::uptr& buff, - transport::recv_link_if* /*recv_link*/, - transport::send_link_if* /*send_link*/) { - _recv_packet->refresh(buff->data()); - const auto header = _recv_packet->get_chdr_header(); - const auto type = header.get_pkt_type(); - const auto dst_epid = header.get_dst_epid(); - - return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS); - }; - - // No flow control at initialization, just release all recv buffs - auto fc_cb = [this](buff_t::uptr buff, - transport::recv_link_if* recv_link, - transport::send_link_if* /*send_link*/) { - recv_link->release_recv_buff(std::move(buff)); - }; - - auto send_io = io_srv->make_send_client(send_link, - 1, // num_send_frames - send_cb, - nullptr, - 0, // num_recv_frames - nullptr); - - auto recv_io = io_srv->make_recv_client(recv_link, - 1, // num_recv_frames - recv_cb, - nullptr, - 0, // num_send_frames - fc_cb); - - chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); - chdr::chdr_packet::uptr& recv_packet = _recv_packet; - - // Function to send a strc init - auto send_strc_init = [&send_io, sep_ids, &strc_packet]( - const stream_buff_params_t fc_freq = {0, 0}) { - transport::frame_buff::uptr buff = send_io->get_send_buff(0); - - if (!buff) { - throw uhd::runtime_error( - "tx xport timed out getting a send buffer for strc init"); - } - - chdr::chdr_header header; - header.set_seq_num(0); - header.set_dst_epid(sep_ids.second); - - chdr::strc_payload strc_pyld; - strc_pyld.src_epid = sep_ids.first; - strc_pyld.op_code = chdr::STRC_INIT; - strc_pyld.num_bytes = fc_freq.bytes; - strc_pyld.num_pkts = fc_freq.packets; - strc_packet->refresh(buff->data(), header, strc_pyld); - - const size_t size = header.get_length(); - buff->set_packet_size(size); - send_io->release_send_buff(std::move(buff)); - }; - - // Function to receive a strs, returns buffer capacity - auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { - transport::frame_buff::uptr buff = recv_io->get_recv_buff(200); - - if (!buff) { - throw uhd::runtime_error( - "tx xport timed out wating for a strs packet during initialization"); - } - - recv_packet->refresh(buff->data()); - UHD_ASSERT_THROW( - recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); - chdr::strs_payload strs; - strs.deserialize(recv_packet->get_payload_const_ptr_as(), - recv_packet->get_payload_size() / sizeof(uint64_t), - recv_packet->conv_to_host()); - - recv_io->release_recv_buff(std::move(buff)); - - return {strs.capacity_bytes, static_cast(strs.capacity_pkts)}; - }; - - // Send a strc init to get the buffer size - send_strc_init(); - stream_buff_params_t capacity = recv_strs(); - _fc_state.set_dest_capacity(capacity); - - UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", - "Received strs initializing buffer capacity to " << capacity.bytes - << " bytes"); - - // Calculate the requested fc_freq parameters - uhd::rfnoc::stream_buff_params_t fc_freq = { - static_cast(std::ceil(double(capacity.bytes) * fc_freq_ratio)), - static_cast(std::ceil(double(capacity.packets) * fc_freq_ratio))}; - - const size_t headroom_bytes = - static_cast(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); - const size_t headroom_packets = static_cast( - std::ceil(double(capacity.packets) * fc_headroom_ratio)); - - fc_freq.bytes -= headroom_bytes; - fc_freq.packets -= headroom_packets; - - // Send a strc init to configure fc freq - send_strc_init(fc_freq); - recv_strs(); - - // Release temporary I/O service interfaces to disconnect from it - send_io.reset(); - recv_io.reset(); - } - // Interface to the I/O service transport::send_io_if::sptr _send_io; diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp index 937baf982..ed6553bf3 100644 --- a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp +++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp @@ -17,12 +17,10 @@ class rx_flow_ctrl_state { public: //! Constructor - rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {} - - //! Initialize frequency parameters - void initialize(const stream_buff_params_t fc_freq) + rx_flow_ctrl_state( + const rfnoc::sep_id_pair_t epids, const stream_buff_params_t fc_freq) + : _fc_freq(fc_freq), _epids(epids) { - _fc_freq = fc_freq; } //! Resynchronize with transfer counts from the sender @@ -50,15 +48,6 @@ public: } } - //! Reset the transfer counts (happens during init) - void reset_counts() - { - UHD_LOGGER_TRACE("rx_flow_ctrl_state") - << "Resetting transfer counts" << std::endl; - _recv_counts = {0, 0}; - _xfer_counts = {0, 0}; - } - //! Update state when data is received void data_received(const size_t bytes) { diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp index 65fc1b093..0005e7584 100644 --- a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp +++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp @@ -15,11 +15,8 @@ namespace uhd { namespace rfnoc { class tx_flow_ctrl_state { public: - //! Updates destination capacity - void set_dest_capacity(const stream_buff_params_t& capacity) - { - _dest_capacity = capacity; - } + //! Constructor + tx_flow_ctrl_state(const stream_buff_params_t& capacity) : _dest_capacity(capacity) {} //! Updates destination received count void update_dest_recv_count(const stream_buff_params_t& recv_count) @@ -67,7 +64,7 @@ public: //! Clears fc resync request pending status void clear_fc_resync_req_pending() { - _fc_resync_req = false; + _fc_resync_req = false; _last_fc_resync_bytes = _xfer_counts.bytes; } diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index 73de394e3..2892e0d6d 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -23,6 +23,8 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/chdr_types.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr_packet.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr_ctrl_xport.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_rx_data_xport.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_tx_data_xport.cpp ${CMAKE_CURRENT_SOURCE_DIR}/client_zero.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ctrl_iface.cpp ${CMAKE_CURRENT_SOURCE_DIR}/device_id.cpp diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp index a6cfda0de..929875dbd 100644 --- a/host/lib/rfnoc/chdr_ctrl_xport.cpp +++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp @@ -16,10 +16,11 @@ using namespace uhd::transport; chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, send_link_if::sptr send_link, recv_link_if::sptr recv_link, + const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, size_t num_recv_frames) - : _my_epid(my_epid) + : _my_epid(my_epid), _recv_packet(pkt_factory.make_generic()) { /* Make dumb send pipe */ send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff, @@ -34,10 +35,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, recv_link_if* /*recv_link*/, send_link_if * /*send_link*/) -> bool { - uint64_t* data = (uint64_t*)buff->data(); - auto hdr = chdr_header(uhd::ntohx(*data)); - auto pkt_type = hdr.get_pkt_type(); - auto dst_epid = hdr.get_dst_epid(); + _recv_packet->refresh(buff->data()); + auto hdr = _recv_packet->get_chdr_header(); + auto pkt_type = hdr.get_pkt_type(); + auto dst_epid = hdr.get_dst_epid(); /* Check type and destination EPID */ if ((pkt_type == PKT_TYPE_CTRL) && (dst_epid == _my_epid)) { @@ -59,10 +60,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, recv_link_if* /*recv_link*/, send_link_if * /*send_link*/) -> bool { - uint64_t* data = (uint64_t*)buff->data(); - auto hdr = chdr_header(uhd::ntohx(*data)); - auto pkt_type = hdr.get_pkt_type(); - auto dst_epid = hdr.get_dst_epid(); + _recv_packet->refresh(buff->data()); + auto hdr = _recv_packet->get_chdr_header(); + auto pkt_type = hdr.get_pkt_type(); + auto dst_epid = hdr.get_dst_epid(); /* Check type and destination EPID */ if ((pkt_type == PKT_TYPE_MGMT) && (dst_epid == _my_epid)) { diff --git a/host/lib/rfnoc/chdr_rx_data_xport.cpp b/host/lib/rfnoc/chdr_rx_data_xport.cpp new file mode 100644 index 000000000..bcd9f7ea9 --- /dev/null +++ b/host/lib/rfnoc/chdr_rx_data_xport.cpp @@ -0,0 +1,203 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include +#include +#include +#include +#include +#include + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::detail; +using namespace uhd::transport; + +rx_flow_ctrl_sender::rx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.first) +{ + _fc_packet = pkt_factory.make_strs(); + _fc_strs_pyld.src_epid = sep_ids.second; +} + +void rx_flow_ctrl_sender::set_capacity(const stream_buff_params_t& recv_capacity) +{ + _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; + _fc_strs_pyld.capacity_pkts = recv_capacity.packets; +} + +chdr_rx_data_xport::chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_recv_frames, + const fc_params_t& fc_params) + : _fc_state(epids, fc_params.freq) + , _fc_sender(pkt_factory, epids) + , _epid(epids.second) +{ + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Creating rx xport with local epid=" << epids.second + << ", remote epid=" << epids.first); + + _recv_packet = pkt_factory.make_generic(); + _fc_sender.set_capacity(fc_params.buff_capacity); + + // Calculate max payload size + const size_t pyld_offset = + _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; + + // Make data transport + auto recv_cb = + [this](buff_t::uptr& buff, recv_link_if* recv_link, send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + auto fc_cb = + [this](buff_t::uptr buff, recv_link_if* recv_link, send_link_if* send_link) { + this->_fc_callback(std::move(buff), recv_link, send_link); + }; + + // Needs just a single send frame for responses + _recv_io = io_srv->make_recv_client(recv_link, + num_recv_frames, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Stream endpoint was configured with:" + << std::endl + << "capacity bytes=" << fc_params.buff_capacity.bytes + << ", packets=" << fc_params.buff_capacity.packets << std::endl + << "fc frequency bytes=" << fc_params.freq.bytes + << ", packets=" << fc_params.freq.packets); +} + +chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + mgmt::mgmt_portal& mgmt_portal, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const stream_buff_params_t& recv_capacity, + const stream_buff_params_t& fc_freq, + const stream_buff_params_t& fc_headroom, + const bool lossy_xport) +{ + const sep_id_t remote_epid = epids.first; + const sep_id_t local_epid = epids.second; + + rx_flow_ctrl_sender fc_sender(pkt_factory, epids); + chdr::chdr_packet::uptr pkt = pkt_factory.make_generic(); + fc_sender.set_capacity(recv_capacity); + chdr::strc_payload strc; + + auto recv_cb = [&pkt, local_epid, &strc](buff_t::uptr& buff, + recv_link_if* /*recv_link*/, + send_link_if* /*send_link*/) { + pkt->refresh(buff->data()); + const auto header = pkt->get_chdr_header(); + const auto dst_epid = header.get_dst_epid(); + + if (dst_epid != local_epid) { + return false; + } + + const auto type = header.get_pkt_type(); + + if (type != chdr::PKT_TYPE_STRC) { + return false; + } + + strc.deserialize(pkt->get_payload_const_ptr_as(), + pkt->get_payload_size() / sizeof(uint64_t), + pkt->conv_to_host()); + + if (strc.op_code != chdr::STRC_INIT) { + throw uhd::value_error("Unexpected opcode value in STRC packet."); + } + + return true; + }; + + auto fc_cb = [&fc_sender](buff_t::uptr buff, + recv_link_if* recv_link, + send_link_if* send_link) { + recv_link->release_recv_buff(std::move(buff)); + + // Send a strs response to configure flow control on the sender. The + // byte and packet counts are not important since they are reset by + // the stream endpoint on receipt of this packet. + fc_sender.send_strs(send_link, {0, 0}); + }; + + // Create a temporary recv_io to receive the strc init + auto recv_io = io_srv->make_recv_client(recv_link, + /* num_recv_frames*/ 1, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + // Create a control transport with the rx data links to send mgmt packets + // needed to setup the stream + // Piggyback on frames from the recv_io_if + auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + pkt_factory, + local_epid, + 0, // num_send_frames + 0); // num_recv_frames + + // Setup a route to the EPID + // Note that this may be gratuitous--The endpoint may already have been set up + mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); + + // Initialize flow control - first, the management portal sends a stream + // command containing its requested flow control frequency + mgmt_portal.config_local_rx_stream_start(*ctrl_xport, + remote_epid, + lossy_xport, + pyld_buff_fmt, + mdata_buff_fmt, + fc_freq, + fc_headroom); + + // Now, release the buffer. In the flow control callback for the recv_io + // (fc_cb above), we send a stream status containing the xport buffer + // capacity. + auto buff = recv_io->get_recv_buff(100); + if (!buff) { + throw uhd::runtime_error( + "rx xport timed out getting a response from mgmt_portal"); + } + recv_io->release_recv_buff(std::move(buff)); + + // Finally, let the management portal know the setup is complete + mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid); + + // The flow control frequency requested is contained in the strc + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Received strc init with fc freq" + << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); + + fc_params_t fc_params; + fc_params.buff_capacity = recv_capacity; + fc_params.freq = {strc.num_bytes, static_cast(strc.num_pkts)}; + + recv_io.reset(); + ctrl_xport.reset(); + + return fc_params; +} diff --git a/host/lib/rfnoc/chdr_tx_data_xport.cpp b/host/lib/rfnoc/chdr_tx_data_xport.cpp new file mode 100644 index 000000000..cb28c7ac9 --- /dev/null +++ b/host/lib/rfnoc/chdr_tx_data_xport.cpp @@ -0,0 +1,249 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include +#include +#include +#include +#include +#include + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::detail; +using namespace uhd::transport; + +tx_flow_ctrl_sender::tx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.second) +{ + _fc_packet = pkt_factory.make_strc(); + _fc_strc_pyld.src_epid = sep_ids.first; + _fc_strc_pyld.op_code = chdr::STRC_RESYNC; +} + +chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_send_frames, + const fc_params_t fc_params) + : _fc_state(fc_params.buff_capacity) + , _fc_sender(pkt_factory, epids) + , _epid(epids.first) +{ + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Creating tx xport with local epid=" << epids.first + << ", remote epid=" << epids.second); + + _send_header.set_dst_epid(epids.second); + _send_packet = pkt_factory.make_generic(); + _recv_packet = pkt_factory.make_generic(); + + // Calculate max payload size + const size_t pyld_offset = + _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = send_link->get_send_frame_size() - pyld_offset; + + // Now create the send I/O we will use for data + auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { + this->_send_callback(buff, send_link); + }; + + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + // Needs just a single recv frame for strs packets + _send_io = io_srv->make_send_client(send_link, + num_send_frames, + send_cb, + recv_link, + /* num_recv_frames */ 1, + recv_cb); +} + +/* + * To configure flow control, we need to send an init strc packet, then + * receive a strs containing the stream endpoint ingress buffer size. We + * then repeat this (now that we know the buffer size) to configure the flow + * control frequency. To avoid having this logic within the data packet + * processing flow, we use temporary send and recv I/O instances with + * simple callbacks here. + */ +static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const sep_id_pair_t epids, + const double fc_freq_ratio, + const double fc_headroom_ratio) +{ + chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); + chdr::chdr_packet::uptr recv_packet = pkt_factory.make_generic(); + + // No flow control at initialization, just release all send buffs + auto send_cb = [](frame_buff::uptr& buff, send_link_if* send_link) { + send_link->release_send_buff(std::move(buff)); + buff = nullptr; + }; + + // For recv, just queue strs packets for recv_io to read + auto recv_cb = [&recv_packet, epids](frame_buff::uptr& buff, + recv_link_if* /*recv_link*/, + send_link_if* /*send_link*/) { + recv_packet->refresh(buff->data()); + const auto header = recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + + return (dst_epid == epids.first && type == chdr::PKT_TYPE_STRS); + }; + + // No flow control at initialization, just release all recv buffs + auto fc_cb = + [](frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* /*send_link*/) { + recv_link->release_recv_buff(std::move(buff)); + }; + + auto send_io = io_srv->make_send_client(send_link, + 1, // num_send_frames + send_cb, + nullptr, + 0, // num_recv_frames + nullptr); + + auto recv_io = io_srv->make_recv_client(recv_link, + 1, // num_recv_frames + recv_cb, + nullptr, + 0, // num_send_frames + fc_cb); + + // Function to send a strc init + auto send_strc_init = [&send_io, epids, &strc_packet]( + const stream_buff_params_t fc_freq = {0, 0}) { + frame_buff::uptr buff = send_io->get_send_buff(0); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out getting a send buffer for strc init"); + } + + chdr::chdr_header header; + header.set_seq_num(0); + header.set_dst_epid(epids.second); + + chdr::strc_payload strc_pyld; + strc_pyld.src_epid = epids.first; + strc_pyld.op_code = chdr::STRC_INIT; + strc_pyld.num_bytes = fc_freq.bytes; + strc_pyld.num_pkts = fc_freq.packets; + strc_packet->refresh(buff->data(), header, strc_pyld); + + const size_t size = header.get_length(); + buff->set_packet_size(size); + send_io->release_send_buff(std::move(buff)); + }; + + // Function to receive a strs, returns buffer capacity + auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { + frame_buff::uptr buff = recv_io->get_recv_buff(200); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out wating for a strs packet during initialization"); + } + + recv_packet->refresh(buff->data()); + UHD_ASSERT_THROW( + recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); + chdr::strs_payload strs; + strs.deserialize(recv_packet->get_payload_const_ptr_as(), + recv_packet->get_payload_size() / sizeof(uint64_t), + recv_packet->conv_to_host()); + + recv_io->release_recv_buff(std::move(buff)); + + return {strs.capacity_bytes, static_cast(strs.capacity_pkts)}; + }; + + // Send a strc init to get the buffer size + send_strc_init(); + stream_buff_params_t capacity = recv_strs(); + + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Received strs initializing buffer capacity to " << capacity.bytes << " bytes"); + + // Calculate the requested fc_freq parameters + stream_buff_params_t fc_freq = { + static_cast(std::ceil(double(capacity.bytes) * fc_freq_ratio)), + static_cast(std::ceil(double(capacity.packets) * fc_freq_ratio))}; + + const size_t headroom_bytes = + static_cast(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); + const size_t headroom_packets = + static_cast(std::ceil(double(capacity.packets) * fc_headroom_ratio)); + + fc_freq.bytes -= headroom_bytes; + fc_freq.packets -= headroom_packets; + + // Send a strc init to configure fc freq + send_strc_init(fc_freq); + recv_strs(); + + // Release temporary I/O service interfaces to disconnect from it + send_io.reset(); + recv_io.reset(); + + return {capacity}; +} + +chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + mgmt::mgmt_portal& mgmt_portal, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const double fc_freq_ratio, + const double fc_headroom_ratio) +{ + const sep_id_t remote_epid = epids.second; + const sep_id_t local_epid = epids.first; + + // Create a control transport with the tx data links to send mgmt packets + // needed to setup the stream. Only need one frame for this. + auto ctrl_xport = chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + pkt_factory, + local_epid, + 1, // num_send_frames + 1); // num_recv_frames + + // Setup a route to the EPID + mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); + + mgmt_portal.config_local_tx_stream( + *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); + + // We no longer need the control xport, release it so + // the control xport is no longer connected to the I/O service. + ctrl_xport.reset(); + + return configure_flow_ctrl(io_srv, + recv_link, + send_link, + pkt_factory, + epids, + fc_freq_ratio, + fc_headroom_ratio); +} -- cgit v1.2.3