diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp | 17 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp | 175 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 323 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp | 17 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp | 9 | ||||
-rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_ctrl_xport.cpp | 19 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_rx_data_xport.cpp | 203 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_tx_data_xport.cpp | 249 |
9 files changed, 597 insertions, 417 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp index 9cf2f305f..726ea7f6c 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_xport.hpp @@ -7,6 +7,7 @@ #ifndef INCLUDED_RFNOC_CHDR_CTRL_XPORT_HPP #define INCLUDED_RFNOC_CHDR_CTRL_XPORT_HPP +#include <uhdlib/rfnoc/chdr_packet.hpp> #include <uhdlib/rfnoc/chdr_types.hpp> #include <uhdlib/transport/io_service.hpp> #include <mutex> @@ -41,12 +42,18 @@ public: static sptr make(io_service::sptr io_srv, send_link_if::sptr send_link, recv_link_if::sptr recv_link, + const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, size_t num_recv_frames) { - return std::make_shared<chdr_ctrl_xport>( - io_srv, send_link, recv_link, my_epid, num_send_frames, num_recv_frames); + return std::make_shared<chdr_ctrl_xport>(io_srv, + send_link, + recv_link, + pkt_factory, + my_epid, + num_send_frames, + num_recv_frames); } /*! @@ -62,6 +69,7 @@ public: chdr_ctrl_xport(io_service::sptr io_srv, send_link_if::sptr send_link, recv_link_if::sptr recv_link, + const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, size_t num_recv_frames); @@ -122,10 +130,15 @@ private: chdr_ctrl_xport(const chdr_ctrl_xport&) = delete; sep_id_t _my_epid; + + // Packet for received data + chdr::chdr_packet::uptr _recv_packet; + send_io_if::sptr _send_if; recv_io_if::sptr _ctrl_recv_if; recv_io_if::sptr _mgmt_recv_if; + // FIXME: Remove this when have threaded_io_service std::mutex _mutex; }; diff --git a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp index 41af766b4..9c5d88066 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_rx_data_xport.hpp @@ -8,9 +8,9 @@ #define INCLUDED_LIBUHD_CHDR_RX_DATA_XPORT_HPP #include <uhd/config.hpp> +#include <uhd/exception.hpp> #include <uhdlib/rfnoc/chdr_packet.hpp> #include <uhdlib/rfnoc/chdr_types.hpp> -#include <uhdlib/rfnoc/mgmt_portal.hpp> #include <uhdlib/rfnoc/rfnoc_common.hpp> #include <uhdlib/rfnoc/rx_flow_ctrl_state.hpp> #include <uhdlib/transport/io_service.hpp> @@ -19,6 +19,10 @@ namespace uhd { namespace rfnoc { +namespace mgmt { +class mgmt_portal; +} + namespace detail { /*! @@ -29,21 +33,12 @@ class rx_flow_ctrl_sender public: //! Constructor rx_flow_ctrl_sender( - const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) - : _dst_epid(sep_ids.first) - { - _fc_packet = pkt_factory.make_strs(); - _fc_strs_pyld.src_epid = sep_ids.second; - } + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids); - /*! Configure buffer capacity + /*! configure buffer capacity * \param recv_capacity The buffer capacity of the receive link */ - void set_capacity(const stream_buff_params_t& recv_capacity) - { - _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; - _fc_strs_pyld.capacity_pkts = recv_capacity.packets; - } + void set_capacity(const stream_buff_params_t& recv_capacity); /*! Send a flow control response packet * @@ -119,113 +114,59 @@ public: const void* payload = nullptr; }; - /*! Constructor + //! Flow control parameters + struct fc_params_t + { + stream_buff_params_t buff_capacity; + stream_buff_params_t freq; + }; + + /*! Configure stream endpoint route and flow control * * \param io_srv The service that will schedule the xport I/O * \param recv_link The recv link, already attached to the I/O service * \param send_link The send link, already attached to the I/O service * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param mgmt_portal Management portal to configure stream endpoint * \param epids Source and destination endpoint IDs * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata - * \param num_recv_frames Num frames to reserve from the recv link * \param recv_capacity Total capacity of the recv link * \param fc_freq Frequency of flow control status messages * \param fc_headroom Headroom for flow control status messages * \param lossy_xport Whether the xport is lossy, for flow control configuration + * \return Parameters for xport flow control */ - chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + static fc_params_t configure_sep(uhd::transport::io_service::sptr io_srv, uhd::transport::recv_link_if::sptr recv_link, uhd::transport::send_link_if::sptr send_link, - uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, const chdr::chdr_packet_factory& pkt_factory, + uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, const uhd::rfnoc::sep_id_pair_t& epids, const uhd::rfnoc::sw_buff_t pyld_buff_fmt, const uhd::rfnoc::sw_buff_t mdata_buff_fmt, - const size_t num_recv_frames, const stream_buff_params_t& recv_capacity, const stream_buff_params_t& fc_freq, const stream_buff_params_t& fc_headroom, - const bool lossy_xport) - : _fc_state(epids), _fc_sender(pkt_factory, epids), _epid(epids.second) - { - const sep_id_t remote_epid = epids.first; - const sep_id_t local_epid = epids.second; - - UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", - "Creating rx xport with local epid=" << local_epid - << ", remote epid=" << remote_epid); - - _recv_packet = pkt_factory.make_generic(); - _fc_sender.set_capacity(recv_capacity); - - // Calculate max payload size - const size_t pyld_offset = - _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); - _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; - - // Make data transport - auto recv_cb = [this](buff_t::uptr& buff, - transport::recv_link_if* recv_link, - transport::send_link_if* send_link) { - return this->_recv_callback(buff, recv_link, send_link); - }; - - auto fc_cb = [this](buff_t::uptr buff, - transport::recv_link_if* recv_link, - transport::send_link_if* send_link) { - this->_fc_callback(std::move(buff), recv_link, send_link); - }; - - // Needs just a single send frame for responses - _recv_io = io_srv->make_recv_client(recv_link, - num_recv_frames, - recv_cb, - send_link, - /* num_send_frames*/ 1, - fc_cb); - - // Create a control transport with the rx data links to send mgmt packets - // needed to setup the stream - // Piggyback on frames from the recv_io_if - auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, - send_link, - recv_link, - local_epid, - 0, // num_send_frames - 0); // num_recv_frames - - // Setup a route to the EPID - // Note that this may be gratuitous--The endpoint may already have been set up - mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); - - // Initialize flow control - management portal sends a stream command - // containing its requested flow control frequency, the rx transport - // responds with a stream status containing its buffer capacity. - mgmt_portal.config_local_rx_stream_start(*ctrl_xport, - remote_epid, - lossy_xport, - pyld_buff_fmt, - mdata_buff_fmt, - fc_freq, - fc_headroom); - - mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid); - - UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", - "Stream endpoint was configured with:" - << std::endl - << "capacity bytes=" << recv_capacity.bytes - << ", packets=" << recv_capacity.packets << std::endl - << "fc headroom bytes=" << fc_headroom.bytes - << ", packets=" << fc_headroom.packets << std::endl - << "fc frequency bytes=" << fc_freq.bytes - << ", packets=" << fc_freq.packets); - - // We no longer need the control xport, release it so - // the control xport is no longer connected to the I/O service. - ctrl_xport.reset(); - } + const bool lossy_xport); + + /*! Constructor + * + * \param io_srv The service that will schedule the xport I/O + * \param recv_link The recv link, already attached to the I/O service + * \param send_link The send link, already attached to the I/O service + * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param epids Source and destination endpoint IDs + * \param num_recv_frames Num frames to reserve from the recv link + * \param fc_params Parameters for flow control + */ + chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_recv_frames, + const fc_params_t& fc_params); /*! Returns maximum number payload bytes * @@ -286,15 +227,16 @@ private: transport::send_link_if* send_link) { _recv_packet->refresh(buff->data()); - const auto header = _recv_packet->get_chdr_header(); - const auto type = header.get_pkt_type(); - const auto dst_epid = header.get_dst_epid(); - const auto packet_size = buff->packet_size(); + const auto header = _recv_packet->get_chdr_header(); + const auto dst_epid = header.get_dst_epid(); if (dst_epid != _epid) { return false; } + const auto type = header.get_pkt_type(); + const auto packet_size = header.get_length(); + if (type == chdr::PKT_TYPE_STRC) { chdr::strc_payload strc; strc.deserialize(_recv_packet->get_payload_const_ptr_as<uint64_t>(), @@ -316,26 +258,6 @@ private: buff = buff_t::uptr(); _fc_state.xfer_done(packet_size); _send_fc_response(send_link); - } else if (strc.op_code == chdr::STRC_INIT) { - _fc_state.initialize( - {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}); - - UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", - "Received strc init with fc freq" - << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); - - // Make sure flow control was initialized - assert(_fc_state.get_fc_freq().bytes > 0); - assert(_fc_state.get_fc_freq().packets > 0); - - // Send a strs response to configure flow control on the sender - _fc_sender.send_strs(send_link, _fc_state.get_xfer_counts()); - - // Reset counts, since mgmt_portal will do it to FPGA - _fc_state.reset_counts(); - - recv_link->release_recv_buff(std::move(buff)); - buff = buff_t::uptr(); } else { throw uhd::value_error("Unexpected opcode value in STRC packet."); } @@ -373,7 +295,9 @@ private: transport::recv_link_if* recv_link, transport::send_link_if* send_link) { - const size_t packet_size = buff->packet_size(); + _recv_packet->refresh(buff->data()); + const auto header = _recv_packet->get_chdr_header(); + const size_t packet_size = header.get_length(); recv_link->release_recv_buff(std::move(buff)); _fc_state.xfer_done(packet_size); _send_fc_response(send_link); @@ -393,8 +317,8 @@ private: } /*! - * Checks if the sequence number is out of sequence, prints 'D' if it is - * and returns result of check. + * Checks if the sequence number is out of sequence, increments sequence + * number for next packet. * * \return true if a sequence error occurred */ @@ -404,7 +328,6 @@ private: _data_seq_num = seq_num + 1; if (expected_packet_count != seq_num) { - UHD_LOG_FASTPATH("D"); return true; } return false; diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 63c2b24cb..1e54a2f7a 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -7,10 +7,11 @@ #ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP #define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#include <uhd/exception.hpp> #include <uhd/types/metadata.hpp> +#include <uhd/utils/log.hpp> #include <uhdlib/rfnoc/chdr_packet.hpp> #include <uhdlib/rfnoc/chdr_types.hpp> -#include <uhdlib/rfnoc/mgmt_portal.hpp> #include <uhdlib/rfnoc/rfnoc_common.hpp> #include <uhdlib/rfnoc/tx_flow_ctrl_state.hpp> #include <uhdlib/transport/io_service.hpp> @@ -19,6 +20,10 @@ namespace uhd { namespace rfnoc { +namespace mgmt { +class mgmt_portal; +} + namespace detail { /*! @@ -29,13 +34,7 @@ class tx_flow_ctrl_sender public: //! Constructor tx_flow_ctrl_sender( - const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) - : _dst_epid(sep_ids.second) - { - _fc_packet = pkt_factory.make_strc(); - _fc_strc_pyld.src_epid = sep_ids.first; - _fc_strc_pyld.op_code = chdr::STRC_RESYNC; - } + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids); /*! * Sends a flow control resync packet @@ -104,9 +103,10 @@ private: class chdr_tx_data_xport { public: - using uptr = std::unique_ptr<chdr_tx_data_xport>; - using buff_t = transport::frame_buff; - using enqueue_async_msg_fn_t = std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>; + using uptr = std::unique_ptr<chdr_tx_data_xport>; + using buff_t = transport::frame_buff; + using enqueue_async_msg_fn_t = + std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>; //! Information about data packet struct packet_info_t @@ -117,84 +117,54 @@ public: size_t payload_bytes = 0; }; - /*! Constructor + //! Flow control parameters + struct fc_params_t + { + stream_buff_params_t buff_capacity; + }; + + /*! Configure route to the sep and flow control * - * \param io_srv The service that will schedule the xport I/O + * \param io_srv The I/O service to be used with the transport * \param recv_link The recv link, already attached to the I/O service * \param send_link The send link, already attached to the I/O service * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param mgmt_portal Management portal to configure stream endpoint * \param epids Source and destination endpoint IDs * \param pyld_buff_fmt Datatype of SW buffer that holds the data payload * \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata - * \param num_send_frames Num frames to reserve from the send link * \param fc_freq_ratio Ratio to use to configure the device fc frequency * \param fc_headroom_ratio Ratio to use to configure the device fc headroom + * \return Parameters for xport flow control */ - chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + static fc_params_t configure_sep(uhd::transport::io_service::sptr io_srv, uhd::transport::recv_link_if::sptr recv_link, uhd::transport::send_link_if::sptr send_link, - uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, const chdr::chdr_packet_factory& pkt_factory, + uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, const uhd::rfnoc::sep_id_pair_t& epids, const uhd::rfnoc::sw_buff_t pyld_buff_fmt, const uhd::rfnoc::sw_buff_t mdata_buff_fmt, - const size_t num_send_frames, const double fc_freq_ratio, - const double fc_headroom_ratio) - : _fc_sender(pkt_factory, epids), _epid(epids.first) - { - const sep_id_t remote_epid = epids.second; - const sep_id_t local_epid = epids.first; - - UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", - "Creating tx xport with local epid=" << local_epid - << ", remote epid=" << remote_epid); - - _send_header.set_dst_epid(epids.second); - _send_packet = pkt_factory.make_generic(); - _recv_packet = pkt_factory.make_generic(); - - // Calculate max payload size - const size_t pyld_offset = - _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); - _max_payload_size = send_link->get_send_frame_size() - pyld_offset; - - _configure_sep(io_srv, - recv_link, - send_link, - mgmt_portal, - local_epid, - remote_epid, - pyld_buff_fmt, - mdata_buff_fmt); - - _initialize_flow_ctrl(io_srv, - recv_link, - send_link, - pkt_factory, - epids, - fc_freq_ratio, - fc_headroom_ratio); - - // Now create the send I/O we will use for data - auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { - this->_send_callback(buff, send_link); - }; - - auto recv_cb = [this](buff_t::uptr& buff, - transport::recv_link_if* recv_link, - transport::send_link_if* send_link) { - return this->_recv_callback(buff, recv_link, send_link); - }; - - // Needs just a single recv frame for strs packets - _send_io = io_srv->make_send_client(send_link, - num_send_frames, - send_cb, - recv_link, - /* num_recv_frames */ 1, - recv_cb); - } + const double fc_headroom_ratio); + + /*! Constructor + * + * \param io_srv The service that will schedule the xport I/O + * \param recv_link The recv link, already attached to the I/O service + * \param send_link The send link, already attached to the I/O service + * \param pkt_factory Factory to create packets with the desired chdr_w and endianness + * \param epids Source and destination endpoint IDs + * \param num_send_frames Num frames to reserve from the send link + * \param fc_params Parameters for flow control + */ + chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_send_frames, + const fc_params_t fc_params); /*! Returns maximum number of payload bytes * @@ -300,23 +270,27 @@ private: if (strs.status != chdr::STRS_OKAY) { switch (strs.status) { - case chdr::STRS_SEQERR: - UHD_LOG_FASTPATH("S"); - if (_enqueue_async_msg) { - _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0); - } - break; - case chdr::STRS_DATAERR: - UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!"); - break; - case chdr::STRS_RTERR: - UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!"); - break; - case chdr::STRS_CMDERR: - UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!"); - break; - default: - break; + case chdr::STRS_SEQERR: + UHD_LOG_FASTPATH("S"); + if (_enqueue_async_msg) { + _enqueue_async_msg( + async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0); + } + break; + case chdr::STRS_DATAERR: + UHD_LOG_WARNING( + "XPORT::TX_DATA_XPORT", "Received data error in tx stream!"); + break; + case chdr::STRS_RTERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", + "Received routing error in tx stream!"); + break; + case chdr::STRS_CMDERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", + "Received command error in tx stream!"); + break; + default: + break; } } @@ -359,177 +333,6 @@ private: } } - /*! - * Configures the stream endpoint using mgmt_portal - */ - void _configure_sep(uhd::transport::io_service::sptr io_srv, - uhd::transport::recv_link_if::sptr recv_link, - uhd::transport::send_link_if::sptr send_link, - uhd::rfnoc::mgmt::mgmt_portal& mgmt_portal, - const uhd::rfnoc::sep_id_t& local_epid, - const uhd::rfnoc::sep_id_t& remote_epid, - const uhd::rfnoc::sw_buff_t pyld_buff_fmt, - const uhd::rfnoc::sw_buff_t mdata_buff_fmt) - { - // Create a control transport with the tx data links to send mgmt packets - // needed to setup the stream. Only need one frame for this. - auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, - send_link, - recv_link, - local_epid, - 1, // num_send_frames - 1); // num_recv_frames - - // Setup a route to the EPID - mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); - - mgmt_portal.config_local_tx_stream( - *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); - - // We no longer need the control xport, release it so - // the control xport is no longer connected to the I/O service. - ctrl_xport.reset(); - } - - /*! - * Initializes flow control - * - * To initialize flow control, we need to send an init strc packet, then - * receive a strs containing the stream endpoint ingress buffer size. We - * then repeat this (now that we know the buffer size) to configure the flow - * control frequency. To avoid having this logic within the data packet - * processing flow, we use temporary send and recv I/O instances with - * simple callbacks here. - */ - void _initialize_flow_ctrl(uhd::transport::io_service::sptr io_srv, - uhd::transport::recv_link_if::sptr recv_link, - uhd::transport::send_link_if::sptr send_link, - const chdr::chdr_packet_factory& pkt_factory, - const sep_id_pair_t sep_ids, - const double fc_freq_ratio, - const double fc_headroom_ratio) - { - // No flow control at initialization, just release all send buffs - auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { - send_link->release_send_buff(std::move(buff)); - buff = nullptr; - }; - - // For recv, just queue strs packets for recv_io to read - auto recv_cb = [this](buff_t::uptr& buff, - transport::recv_link_if* /*recv_link*/, - transport::send_link_if* /*send_link*/) { - _recv_packet->refresh(buff->data()); - const auto header = _recv_packet->get_chdr_header(); - const auto type = header.get_pkt_type(); - const auto dst_epid = header.get_dst_epid(); - - return (dst_epid == _epid && type == chdr::PKT_TYPE_STRS); - }; - - // No flow control at initialization, just release all recv buffs - auto fc_cb = [this](buff_t::uptr buff, - transport::recv_link_if* recv_link, - transport::send_link_if* /*send_link*/) { - recv_link->release_recv_buff(std::move(buff)); - }; - - auto send_io = io_srv->make_send_client(send_link, - 1, // num_send_frames - send_cb, - nullptr, - 0, // num_recv_frames - nullptr); - - auto recv_io = io_srv->make_recv_client(recv_link, - 1, // num_recv_frames - recv_cb, - nullptr, - 0, // num_send_frames - fc_cb); - - chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); - chdr::chdr_packet::uptr& recv_packet = _recv_packet; - - // Function to send a strc init - auto send_strc_init = [&send_io, sep_ids, &strc_packet]( - const stream_buff_params_t fc_freq = {0, 0}) { - transport::frame_buff::uptr buff = send_io->get_send_buff(0); - - if (!buff) { - throw uhd::runtime_error( - "tx xport timed out getting a send buffer for strc init"); - } - - chdr::chdr_header header; - header.set_seq_num(0); - header.set_dst_epid(sep_ids.second); - - chdr::strc_payload strc_pyld; - strc_pyld.src_epid = sep_ids.first; - strc_pyld.op_code = chdr::STRC_INIT; - strc_pyld.num_bytes = fc_freq.bytes; - strc_pyld.num_pkts = fc_freq.packets; - strc_packet->refresh(buff->data(), header, strc_pyld); - - const size_t size = header.get_length(); - buff->set_packet_size(size); - send_io->release_send_buff(std::move(buff)); - }; - - // Function to receive a strs, returns buffer capacity - auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { - transport::frame_buff::uptr buff = recv_io->get_recv_buff(200); - - if (!buff) { - throw uhd::runtime_error( - "tx xport timed out wating for a strs packet during initialization"); - } - - recv_packet->refresh(buff->data()); - UHD_ASSERT_THROW( - recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); - chdr::strs_payload strs; - strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(), - recv_packet->get_payload_size() / sizeof(uint64_t), - recv_packet->conv_to_host<uint64_t>()); - - recv_io->release_recv_buff(std::move(buff)); - - return {strs.capacity_bytes, static_cast<uint32_t>(strs.capacity_pkts)}; - }; - - // Send a strc init to get the buffer size - send_strc_init(); - stream_buff_params_t capacity = recv_strs(); - _fc_state.set_dest_capacity(capacity); - - UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", - "Received strs initializing buffer capacity to " << capacity.bytes - << " bytes"); - - // Calculate the requested fc_freq parameters - uhd::rfnoc::stream_buff_params_t fc_freq = { - static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)), - static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_freq_ratio))}; - - const size_t headroom_bytes = - static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); - const size_t headroom_packets = static_cast<uint32_t>( - std::ceil(double(capacity.packets) * fc_headroom_ratio)); - - fc_freq.bytes -= headroom_bytes; - fc_freq.packets -= headroom_packets; - - // Send a strc init to configure fc freq - send_strc_init(fc_freq); - recv_strs(); - - // Release temporary I/O service interfaces to disconnect from it - send_io.reset(); - recv_io.reset(); - } - // Interface to the I/O service transport::send_io_if::sptr _send_io; diff --git a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp index 937baf982..ed6553bf3 100644 --- a/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp +++ b/host/lib/include/uhdlib/rfnoc/rx_flow_ctrl_state.hpp @@ -17,12 +17,10 @@ class rx_flow_ctrl_state { public: //! Constructor - rx_flow_ctrl_state(const rfnoc::sep_id_pair_t epids) : _epids(epids) {} - - //! Initialize frequency parameters - void initialize(const stream_buff_params_t fc_freq) + rx_flow_ctrl_state( + const rfnoc::sep_id_pair_t epids, const stream_buff_params_t fc_freq) + : _fc_freq(fc_freq), _epids(epids) { - _fc_freq = fc_freq; } //! Resynchronize with transfer counts from the sender @@ -50,15 +48,6 @@ public: } } - //! Reset the transfer counts (happens during init) - void reset_counts() - { - UHD_LOGGER_TRACE("rx_flow_ctrl_state") - << "Resetting transfer counts" << std::endl; - _recv_counts = {0, 0}; - _xfer_counts = {0, 0}; - } - //! Update state when data is received void data_received(const size_t bytes) { diff --git a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp index 65fc1b093..0005e7584 100644 --- a/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp +++ b/host/lib/include/uhdlib/rfnoc/tx_flow_ctrl_state.hpp @@ -15,11 +15,8 @@ namespace uhd { namespace rfnoc { class tx_flow_ctrl_state { public: - //! Updates destination capacity - void set_dest_capacity(const stream_buff_params_t& capacity) - { - _dest_capacity = capacity; - } + //! Constructor + tx_flow_ctrl_state(const stream_buff_params_t& capacity) : _dest_capacity(capacity) {} //! Updates destination received count void update_dest_recv_count(const stream_buff_params_t& recv_count) @@ -67,7 +64,7 @@ public: //! Clears fc resync request pending status void clear_fc_resync_req_pending() { - _fc_resync_req = false; + _fc_resync_req = false; _last_fc_resync_bytes = _xfer_counts.bytes; } diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index 73de394e3..2892e0d6d 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -23,6 +23,8 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/chdr_types.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr_packet.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr_ctrl_xport.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_rx_data_xport.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_tx_data_xport.cpp ${CMAKE_CURRENT_SOURCE_DIR}/client_zero.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ctrl_iface.cpp ${CMAKE_CURRENT_SOURCE_DIR}/device_id.cpp diff --git a/host/lib/rfnoc/chdr_ctrl_xport.cpp b/host/lib/rfnoc/chdr_ctrl_xport.cpp index a6cfda0de..929875dbd 100644 --- a/host/lib/rfnoc/chdr_ctrl_xport.cpp +++ b/host/lib/rfnoc/chdr_ctrl_xport.cpp @@ -16,10 +16,11 @@ using namespace uhd::transport; chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, send_link_if::sptr send_link, recv_link_if::sptr recv_link, + const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid, size_t num_send_frames, size_t num_recv_frames) - : _my_epid(my_epid) + : _my_epid(my_epid), _recv_packet(pkt_factory.make_generic()) { /* Make dumb send pipe */ send_io_if::send_callback_t send_cb = [this](frame_buff::uptr& buff, @@ -34,10 +35,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, recv_link_if* /*recv_link*/, send_link_if * /*send_link*/) -> bool { - uint64_t* data = (uint64_t*)buff->data(); - auto hdr = chdr_header(uhd::ntohx<uint64_t>(*data)); - auto pkt_type = hdr.get_pkt_type(); - auto dst_epid = hdr.get_dst_epid(); + _recv_packet->refresh(buff->data()); + auto hdr = _recv_packet->get_chdr_header(); + auto pkt_type = hdr.get_pkt_type(); + auto dst_epid = hdr.get_dst_epid(); /* Check type and destination EPID */ if ((pkt_type == PKT_TYPE_CTRL) && (dst_epid == _my_epid)) { @@ -59,10 +60,10 @@ chdr_ctrl_xport::chdr_ctrl_xport(io_service::sptr io_srv, recv_link_if* /*recv_link*/, send_link_if * /*send_link*/) -> bool { - uint64_t* data = (uint64_t*)buff->data(); - auto hdr = chdr_header(uhd::ntohx<uint64_t>(*data)); - auto pkt_type = hdr.get_pkt_type(); - auto dst_epid = hdr.get_dst_epid(); + _recv_packet->refresh(buff->data()); + auto hdr = _recv_packet->get_chdr_header(); + auto pkt_type = hdr.get_pkt_type(); + auto dst_epid = hdr.get_dst_epid(); /* Check type and destination EPID */ if ((pkt_type == PKT_TYPE_MGMT) && (dst_epid == _my_epid)) { diff --git a/host/lib/rfnoc/chdr_rx_data_xport.cpp b/host/lib/rfnoc/chdr_rx_data_xport.cpp new file mode 100644 index 000000000..bcd9f7ea9 --- /dev/null +++ b/host/lib/rfnoc/chdr_rx_data_xport.cpp @@ -0,0 +1,203 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhdlib/rfnoc/chdr_rx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::detail; +using namespace uhd::transport; + +rx_flow_ctrl_sender::rx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.first) +{ + _fc_packet = pkt_factory.make_strs(); + _fc_strs_pyld.src_epid = sep_ids.second; +} + +void rx_flow_ctrl_sender::set_capacity(const stream_buff_params_t& recv_capacity) +{ + _fc_strs_pyld.capacity_bytes = recv_capacity.bytes; + _fc_strs_pyld.capacity_pkts = recv_capacity.packets; +} + +chdr_rx_data_xport::chdr_rx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_recv_frames, + const fc_params_t& fc_params) + : _fc_state(epids, fc_params.freq) + , _fc_sender(pkt_factory, epids) + , _epid(epids.second) +{ + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Creating rx xport with local epid=" << epids.second + << ", remote epid=" << epids.first); + + _recv_packet = pkt_factory.make_generic(); + _fc_sender.set_capacity(fc_params.buff_capacity); + + // Calculate max payload size + const size_t pyld_offset = + _recv_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = recv_link->get_recv_frame_size() - pyld_offset; + + // Make data transport + auto recv_cb = + [this](buff_t::uptr& buff, recv_link_if* recv_link, send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + auto fc_cb = + [this](buff_t::uptr buff, recv_link_if* recv_link, send_link_if* send_link) { + this->_fc_callback(std::move(buff), recv_link, send_link); + }; + + // Needs just a single send frame for responses + _recv_io = io_srv->make_recv_client(recv_link, + num_recv_frames, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Stream endpoint was configured with:" + << std::endl + << "capacity bytes=" << fc_params.buff_capacity.bytes + << ", packets=" << fc_params.buff_capacity.packets << std::endl + << "fc frequency bytes=" << fc_params.freq.bytes + << ", packets=" << fc_params.freq.packets); +} + +chdr_rx_data_xport::fc_params_t chdr_rx_data_xport::configure_sep(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + mgmt::mgmt_portal& mgmt_portal, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const stream_buff_params_t& recv_capacity, + const stream_buff_params_t& fc_freq, + const stream_buff_params_t& fc_headroom, + const bool lossy_xport) +{ + const sep_id_t remote_epid = epids.first; + const sep_id_t local_epid = epids.second; + + rx_flow_ctrl_sender fc_sender(pkt_factory, epids); + chdr::chdr_packet::uptr pkt = pkt_factory.make_generic(); + fc_sender.set_capacity(recv_capacity); + chdr::strc_payload strc; + + auto recv_cb = [&pkt, local_epid, &strc](buff_t::uptr& buff, + recv_link_if* /*recv_link*/, + send_link_if* /*send_link*/) { + pkt->refresh(buff->data()); + const auto header = pkt->get_chdr_header(); + const auto dst_epid = header.get_dst_epid(); + + if (dst_epid != local_epid) { + return false; + } + + const auto type = header.get_pkt_type(); + + if (type != chdr::PKT_TYPE_STRC) { + return false; + } + + strc.deserialize(pkt->get_payload_const_ptr_as<uint64_t>(), + pkt->get_payload_size() / sizeof(uint64_t), + pkt->conv_to_host<uint64_t>()); + + if (strc.op_code != chdr::STRC_INIT) { + throw uhd::value_error("Unexpected opcode value in STRC packet."); + } + + return true; + }; + + auto fc_cb = [&fc_sender](buff_t::uptr buff, + recv_link_if* recv_link, + send_link_if* send_link) { + recv_link->release_recv_buff(std::move(buff)); + + // Send a strs response to configure flow control on the sender. The + // byte and packet counts are not important since they are reset by + // the stream endpoint on receipt of this packet. + fc_sender.send_strs(send_link, {0, 0}); + }; + + // Create a temporary recv_io to receive the strc init + auto recv_io = io_srv->make_recv_client(recv_link, + /* num_recv_frames*/ 1, + recv_cb, + send_link, + /* num_send_frames*/ 1, + fc_cb); + + // Create a control transport with the rx data links to send mgmt packets + // needed to setup the stream + // Piggyback on frames from the recv_io_if + auto ctrl_xport = uhd::rfnoc::chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + pkt_factory, + local_epid, + 0, // num_send_frames + 0); // num_recv_frames + + // Setup a route to the EPID + // Note that this may be gratuitous--The endpoint may already have been set up + mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); + + // Initialize flow control - first, the management portal sends a stream + // command containing its requested flow control frequency + mgmt_portal.config_local_rx_stream_start(*ctrl_xport, + remote_epid, + lossy_xport, + pyld_buff_fmt, + mdata_buff_fmt, + fc_freq, + fc_headroom); + + // Now, release the buffer. In the flow control callback for the recv_io + // (fc_cb above), we send a stream status containing the xport buffer + // capacity. + auto buff = recv_io->get_recv_buff(100); + if (!buff) { + throw uhd::runtime_error( + "rx xport timed out getting a response from mgmt_portal"); + } + recv_io->release_recv_buff(std::move(buff)); + + // Finally, let the management portal know the setup is complete + mgmt_portal.config_local_rx_stream_commit(*ctrl_xport, remote_epid); + + // The flow control frequency requested is contained in the strc + UHD_LOG_TRACE("XPORT::RX_DATA_XPORT", + "Received strc init with fc freq" + << " bytes=" << strc.num_bytes << ", packets=" << strc.num_pkts); + + fc_params_t fc_params; + fc_params.buff_capacity = recv_capacity; + fc_params.freq = {strc.num_bytes, static_cast<uint32_t>(strc.num_pkts)}; + + recv_io.reset(); + ctrl_xport.reset(); + + return fc_params; +} diff --git a/host/lib/rfnoc/chdr_tx_data_xport.cpp b/host/lib/rfnoc/chdr_tx_data_xport.cpp new file mode 100644 index 000000000..cb28c7ac9 --- /dev/null +++ b/host/lib/rfnoc/chdr_tx_data_xport.cpp @@ -0,0 +1,249 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/rfnoc/chdr_types.hpp> +#include <uhdlib/rfnoc/mgmt_portal.hpp> +#include <uhdlib/rfnoc/rfnoc_common.hpp> +#include <uhdlib/transport/io_service.hpp> +#include <uhdlib/transport/link_if.hpp> + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::detail; +using namespace uhd::transport; + +tx_flow_ctrl_sender::tx_flow_ctrl_sender( + const chdr::chdr_packet_factory& pkt_factory, const sep_id_pair_t sep_ids) + : _dst_epid(sep_ids.second) +{ + _fc_packet = pkt_factory.make_strc(); + _fc_strc_pyld.src_epid = sep_ids.first; + _fc_strc_pyld.op_code = chdr::STRC_RESYNC; +} + +chdr_tx_data_xport::chdr_tx_data_xport(uhd::transport::io_service::sptr io_srv, + uhd::transport::recv_link_if::sptr recv_link, + uhd::transport::send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const uhd::rfnoc::sep_id_pair_t& epids, + const size_t num_send_frames, + const fc_params_t fc_params) + : _fc_state(fc_params.buff_capacity) + , _fc_sender(pkt_factory, epids) + , _epid(epids.first) +{ + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Creating tx xport with local epid=" << epids.first + << ", remote epid=" << epids.second); + + _send_header.set_dst_epid(epids.second); + _send_packet = pkt_factory.make_generic(); + _recv_packet = pkt_factory.make_generic(); + + // Calculate max payload size + const size_t pyld_offset = + _send_packet->calculate_payload_offset(chdr::PKT_TYPE_DATA_WITH_TS); + _max_payload_size = send_link->get_send_frame_size() - pyld_offset; + + // Now create the send I/O we will use for data + auto send_cb = [this](buff_t::uptr& buff, transport::send_link_if* send_link) { + this->_send_callback(buff, send_link); + }; + + auto recv_cb = [this](buff_t::uptr& buff, + transport::recv_link_if* recv_link, + transport::send_link_if* send_link) { + return this->_recv_callback(buff, recv_link, send_link); + }; + + // Needs just a single recv frame for strs packets + _send_io = io_srv->make_send_client(send_link, + num_send_frames, + send_cb, + recv_link, + /* num_recv_frames */ 1, + recv_cb); +} + +/* + * To configure flow control, we need to send an init strc packet, then + * receive a strs containing the stream endpoint ingress buffer size. We + * then repeat this (now that we know the buffer size) to configure the flow + * control frequency. To avoid having this logic within the data packet + * processing flow, we use temporary send and recv I/O instances with + * simple callbacks here. + */ +static chdr_tx_data_xport::fc_params_t configure_flow_ctrl(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + const sep_id_pair_t epids, + const double fc_freq_ratio, + const double fc_headroom_ratio) +{ + chdr::chdr_strc_packet::uptr strc_packet = pkt_factory.make_strc(); + chdr::chdr_packet::uptr recv_packet = pkt_factory.make_generic(); + + // No flow control at initialization, just release all send buffs + auto send_cb = [](frame_buff::uptr& buff, send_link_if* send_link) { + send_link->release_send_buff(std::move(buff)); + buff = nullptr; + }; + + // For recv, just queue strs packets for recv_io to read + auto recv_cb = [&recv_packet, epids](frame_buff::uptr& buff, + recv_link_if* /*recv_link*/, + send_link_if* /*send_link*/) { + recv_packet->refresh(buff->data()); + const auto header = recv_packet->get_chdr_header(); + const auto type = header.get_pkt_type(); + const auto dst_epid = header.get_dst_epid(); + + return (dst_epid == epids.first && type == chdr::PKT_TYPE_STRS); + }; + + // No flow control at initialization, just release all recv buffs + auto fc_cb = + [](frame_buff::uptr buff, recv_link_if* recv_link, send_link_if* /*send_link*/) { + recv_link->release_recv_buff(std::move(buff)); + }; + + auto send_io = io_srv->make_send_client(send_link, + 1, // num_send_frames + send_cb, + nullptr, + 0, // num_recv_frames + nullptr); + + auto recv_io = io_srv->make_recv_client(recv_link, + 1, // num_recv_frames + recv_cb, + nullptr, + 0, // num_send_frames + fc_cb); + + // Function to send a strc init + auto send_strc_init = [&send_io, epids, &strc_packet]( + const stream_buff_params_t fc_freq = {0, 0}) { + frame_buff::uptr buff = send_io->get_send_buff(0); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out getting a send buffer for strc init"); + } + + chdr::chdr_header header; + header.set_seq_num(0); + header.set_dst_epid(epids.second); + + chdr::strc_payload strc_pyld; + strc_pyld.src_epid = epids.first; + strc_pyld.op_code = chdr::STRC_INIT; + strc_pyld.num_bytes = fc_freq.bytes; + strc_pyld.num_pkts = fc_freq.packets; + strc_packet->refresh(buff->data(), header, strc_pyld); + + const size_t size = header.get_length(); + buff->set_packet_size(size); + send_io->release_send_buff(std::move(buff)); + }; + + // Function to receive a strs, returns buffer capacity + auto recv_strs = [&recv_io, &recv_packet]() -> stream_buff_params_t { + frame_buff::uptr buff = recv_io->get_recv_buff(200); + + if (!buff) { + throw uhd::runtime_error( + "tx xport timed out wating for a strs packet during initialization"); + } + + recv_packet->refresh(buff->data()); + UHD_ASSERT_THROW( + recv_packet->get_chdr_header().get_pkt_type() == chdr::PKT_TYPE_STRS); + chdr::strs_payload strs; + strs.deserialize(recv_packet->get_payload_const_ptr_as<uint64_t>(), + recv_packet->get_payload_size() / sizeof(uint64_t), + recv_packet->conv_to_host<uint64_t>()); + + recv_io->release_recv_buff(std::move(buff)); + + return {strs.capacity_bytes, static_cast<uint32_t>(strs.capacity_pkts)}; + }; + + // Send a strc init to get the buffer size + send_strc_init(); + stream_buff_params_t capacity = recv_strs(); + + UHD_LOG_TRACE("XPORT::TX_DATA_XPORT", + "Received strs initializing buffer capacity to " << capacity.bytes << " bytes"); + + // Calculate the requested fc_freq parameters + stream_buff_params_t fc_freq = { + static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_freq_ratio)), + static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_freq_ratio))}; + + const size_t headroom_bytes = + static_cast<uint64_t>(std::ceil(double(capacity.bytes) * fc_headroom_ratio)); + const size_t headroom_packets = + static_cast<uint32_t>(std::ceil(double(capacity.packets) * fc_headroom_ratio)); + + fc_freq.bytes -= headroom_bytes; + fc_freq.packets -= headroom_packets; + + // Send a strc init to configure fc freq + send_strc_init(fc_freq); + recv_strs(); + + // Release temporary I/O service interfaces to disconnect from it + send_io.reset(); + recv_io.reset(); + + return {capacity}; +} + +chdr_tx_data_xport::fc_params_t chdr_tx_data_xport::configure_sep(io_service::sptr io_srv, + recv_link_if::sptr recv_link, + send_link_if::sptr send_link, + const chdr::chdr_packet_factory& pkt_factory, + mgmt::mgmt_portal& mgmt_portal, + const sep_id_pair_t& epids, + const sw_buff_t pyld_buff_fmt, + const sw_buff_t mdata_buff_fmt, + const double fc_freq_ratio, + const double fc_headroom_ratio) +{ + const sep_id_t remote_epid = epids.second; + const sep_id_t local_epid = epids.first; + + // Create a control transport with the tx data links to send mgmt packets + // needed to setup the stream. Only need one frame for this. + auto ctrl_xport = chdr_ctrl_xport::make(io_srv, + send_link, + recv_link, + pkt_factory, + local_epid, + 1, // num_send_frames + 1); // num_recv_frames + + // Setup a route to the EPID + mgmt_portal.setup_local_route(*ctrl_xport, remote_epid); + + mgmt_portal.config_local_tx_stream( + *ctrl_xport, remote_epid, pyld_buff_fmt, mdata_buff_fmt); + + // We no longer need the control xport, release it so + // the control xport is no longer connected to the I/O service. + ctrl_xport.reset(); + + return configure_flow_ctrl(io_srv, + recv_link, + send_link, + pkt_factory, + epids, + fc_freq_ratio, + fc_headroom_ratio); +} |