From 20baa413a08cdf42ec30d6bc0aeb0c665ee590fe Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Tue, 4 Jun 2019 15:35:44 -0700 Subject: rfnoc: Make a chdr_ctrl_xport using the new link APIs These changes add APIs to instantiate the new transports. However, only the control/management transport is currently implemented. It uses the chdr_ctrl_xport. Also update the mgmt_portal to use an ephemeral reference to the shared transport, to indicate that it has no ownership of the transport's memory. --- .../include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp | 5 +- .../include/uhdlib/rfnoc/link_stream_manager.hpp | 10 ++- host/lib/include/uhdlib/rfnoc/mb_iface.hpp | 51 ++++++++++-- host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp | 28 +++++-- host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp | 16 +--- host/lib/rfnoc/chdr_ctrl_endpoint.cpp | 34 +++++--- host/lib/rfnoc/link_stream_manager.cpp | 79 ++++++++---------- host/lib/rfnoc/mgmt_portal.cpp | 94 ++++++++++++---------- 8 files changed, 182 insertions(+), 135 deletions(-) diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp index b3c3e0108..1281cc0ea 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp @@ -7,6 +7,7 @@ #ifndef INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP #define INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP +#include #include #include #include @@ -46,11 +47,11 @@ public: //! Creates a control endpoint object // - // \param xports The transports used to send and recv packets + // \param xport The transport used to send and recv packets // \param pkt_factor An instance of the CHDR packet factory // \param my_epid The endpoint ID of this software endpoint // - static uptr make(const chdr_ctrl_xport_t& xports, + static uptr make(chdr_ctrl_xport::sptr xport, const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid); diff --git a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp index 4ff69bb3e..79121a498 100644 --- a/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp +++ b/host/lib/include/uhdlib/rfnoc/link_stream_manager.hpp @@ -17,8 +17,8 @@ namespace uhd { namespace rfnoc { -/*! A class that is responsible managing all data endpoints, control endpoints and client - * zero instances accessible via a logical link between the host device and +/*! A class that is responsible for managing all data endpoints, control endpoints and + * client zero instances accessible via a logical link between the host device and * motherboard. * * Note that each transport adapter on the host has its own set of streaming endpoints, @@ -120,7 +120,8 @@ public: * \param xport_args The transport arguments * \return An transport instance */ - virtual chdr_data_xport_t create_host_to_device_data_stream(const sep_addr_t dst_addr, + virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( + const sep_addr_t dst_addr, const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, @@ -139,7 +140,8 @@ public: * \param xport_args The transport arguments * \return An transport instance */ - virtual chdr_data_xport_t create_device_to_host_data_stream(const sep_addr_t src_addr, + virtual chdr_rx_data_xport::uptr create_device_to_host_data_stream( + const sep_addr_t src_addr, const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, diff --git a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp index ce1106c4c..cca8dcab8 100644 --- a/host/lib/include/uhdlib/rfnoc/mb_iface.hpp +++ b/host/lib/include/uhdlib/rfnoc/mb_iface.hpp @@ -7,11 +7,20 @@ #ifndef INCLUDED_LIBUHD_MB_IFACE_HPP #define INCLUDED_LIBUHD_MB_IFACE_HPP +#include #include #include namespace uhd { namespace rfnoc { +// FIXME: Update this +class chdr_rx_data_xport +{ +public: + using uptr = std::unique_ptr; +}; + +using chdr_tx_data_xport = chdr_rx_data_xport; /*! Motherboard (backchannel) interface * @@ -59,14 +68,46 @@ public: virtual void reset_network() = 0; /*! Create a control transport + * + * This is usually called once per motherboard, since there is only one + * control transport required to talk to all the blocks on the control + * crossbar. + * + * \param local_device_id ID for the host transport adapter to use + * \param local_epid Host streaming endpoint ID + * \return A CHDR control transport */ - virtual chdr_ctrl_xport_t make_ctrl_transport( - device_id_t local_device_id, const sep_id_t& src_epid) = 0; + virtual chdr_ctrl_xport::sptr make_ctrl_transport( + device_id_t local_device_id, const sep_id_t& local_epid) = 0; - /*! Create a data transport + /*! Create an RX data transport + * + * This is typically called once per streaming channel. + * + * \param local_device_id ID for the host transport adapter to use + * \param local_epid Host (sink) streaming endpoint ID + * \param remote_epid Remote device (source) streaming endpoint ID + * \param xport_args Transport configuration args + * \return A CHDR RX data transport + */ + virtual chdr_rx_data_xport::uptr make_rx_data_transport(device_id_t local_device_id, + const sep_id_t& local_epid, + const sep_id_t& remote_epid, + const device_addr_t& xport_args) = 0; + + /*! Create an TX data transport + * + * This is typically called once per streaming channel. + * + * \param local_device_id ID for the host transport adapter to use + * \param local_epid Host (source) streaming endpoint ID + * \param remote_epid Remote device (sink) streaming endpoint ID + * \param xport_args Transport configuration args + * \return A CHDR TX data transport */ - virtual chdr_data_xport_t make_data_transport(device_id_t local_device_id, - const sep_id_t& src_epid, + virtual chdr_tx_data_xport::uptr make_tx_data_transport(device_id_t local_device_id, + const sep_id_t& local_epid, + const sep_id_t& remote_epid, const device_addr_t& xport_args) = 0; }; diff --git a/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp b/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp index 1412d0e3d..ac72931bf 100644 --- a/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp +++ b/host/lib/include/uhdlib/rfnoc/mgmt_portal.hpp @@ -7,6 +7,7 @@ #ifndef INCLUDED_LIBUHD_MGMT_PORTAL_HPP #define INCLUDED_LIBUHD_MGMT_PORTAL_HPP +#include #include #include #include @@ -54,10 +55,12 @@ public: //! Initialize a stream endpoint and assign an endpoint ID to it // + // \param xport The host stream endpoint's CTRL transport // \param addr The physical address of the stream endpoint // \param epid The endpoint ID to assign to this endpoint // - virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) = 0; + virtual void initialize_endpoint( + chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid) = 0; //! Get information about a discovered (reachable) stream endpoint // @@ -77,9 +80,10 @@ public: // destination simply by setting the DstEPID in the CHDR header to the specified // dst_epid // + // \param xport The host stream endpoint's CTRL transport // \param dst_epid The endpoint ID of the destination // - virtual void setup_local_route(const sep_id_t& dst_epid) = 0; + virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid) = 0; //! Can a route from between the source and destination endpoints be established? // @@ -95,11 +99,12 @@ public: // to the destination simply by setting the DstEPID in the CHDR header to the // specified dst_epid // + // \param xport The host stream endpoint's CTRL transport // \param dst_epid The endpoint ID of the destination // \param src_epid The endpoint ID of the source // virtual void setup_remote_route( - const sep_id_t& dst_epid, const sep_id_t& src_epid) = 0; + chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid) = 0; //! Start configuring a flow controlled receive data stream from the endpoint with the // specified ID to this SW mgmt portal. @@ -108,6 +113,7 @@ public: // control handler needs to acknoweledge the setup transaction then call the commit // function below. // + // \param xport The host stream endpoint's CTRL transport (same EPID as RX stream) // \param epid The endpoint ID of the data source // \param lossy_xport Is the transport lossy? (e.g. UDP, not liberio) // \param pyld_buff_fmt Datatype of SW buffer that holds the data payload @@ -116,7 +122,8 @@ public: // \param fc_freq Flow control headroom parameters // \param reset Reset ingress stream endpoint state // - virtual void config_local_rx_stream_start(const sep_id_t& epid, + virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport, + const sep_id_t& epid, const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, @@ -127,19 +134,22 @@ public: //! Finish configuring a flow controlled receive data stream from the endpoint with // the specified ID to this SW mgmt portal. // + // \param xport The host stream endpoint's CTRL transport (same EPID as RX stream) // \param epid The endpoint ID of the data source // virtual stream_buff_params_t config_local_rx_stream_commit( - const sep_id_t& epid, const double timeout = 0.2) = 0; + chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2) = 0; //! Configure a flow controlled transmit data stream from this SW mgmt portal to the // endpoint with the specified ID. // + // \param xport The host stream endpoint's CTRL transport (same EPID as TX stream) // \param pyld_buff_fmt Datatype of SW buffer that holds the data payload // \param mdata_buff_fmt Datatype of SW buffer that holds the data metadata // \param reset Reset ingress stream endpoint state // - virtual void config_local_tx_stream(const sep_id_t& epid, + virtual void config_local_tx_stream(chdr_ctrl_xport& xport, + const sep_id_t& epid, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, const bool reset = false) = 0; @@ -147,6 +157,7 @@ public: //! Configure a flow controlled data stream from the endpoint with ID src_epid to the // endpoint with ID dst_epid // + // \param xport The host stream endpoint's CTRL transport // \param dst_epid The endpoint ID of the destination // \param src_epid The endpoint ID of the source // \param lossy_xport Is the transport lossy? @@ -155,7 +166,8 @@ public: // \param fc_freq Flow control response frequency parameters // \param fc_freq Flow control headroom parameters // - virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid, + virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport, + const sep_id_t& dst_epid, const sep_id_t& src_epid, const bool lossy_xport, const stream_buff_params_t& fc_freq, @@ -176,7 +188,7 @@ public: //! Create an endpoint manager object // - static uptr make(const chdr_ctrl_xport_t& xport, + static uptr make(chdr_ctrl_xport& xport, const chdr::chdr_packet_factory& pkt_factory, sep_addr_t my_sep_addr, sep_id_t my_epid); diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp index c08c8d74a..7ec1b7bb2 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_common.hpp @@ -7,7 +7,7 @@ #ifndef INCLUDED_RFNOC_RFNOC_COMMON_HPP #define INCLUDED_RFNOC_RFNOC_COMMON_HPP -#include +#include #include namespace uhd { namespace rfnoc { @@ -66,20 +66,6 @@ struct stream_buff_params_t //! The data type of the buffer used to capture/generate data enum sw_buff_t { BUFF_U64 = 0, BUFF_U32 = 1, BUFF_U16 = 2, BUFF_U8 = 3 }; -// TODO: Update these -struct chdr_ctrl_xport_t -{ - chdr_ctrl_xport_t() = default; - uhd::transport::zero_copy_if::sptr recv; - uhd::transport::zero_copy_if::sptr send; - stream_buff_params_t recv_buff_params{0, 0}; - stream_buff_params_t send_buff_params{0, 0}; - sep_id_t src_epid{0}; - sep_id_t dst_epid{0}; -}; - -using chdr_data_xport_t = chdr_ctrl_xport_t; - //---------------------------------------------- // Constants //---------------------------------------------- diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp index d1d1dccca..d3c7cd58f 100644 --- a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp +++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp @@ -27,7 +27,7 @@ chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default; class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint { public: - chdr_ctrl_endpoint_impl(const chdr_ctrl_xport_t& xport, + chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport, const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid) : _my_epid(my_epid) @@ -57,7 +57,14 @@ public: // there are no timed blocks on the underlying. _recv_thread.join(); // Flush base transport - while (_xport.recv->get_recv_buff(0.0001)) /*NOP*/; + while (true) { + auto buff = _xport->get_recv_buff(100); + if (buff) { + _xport->release_recv_buff(std::move(buff)); + } else { + break; + } + } // Release child endpoints _endpoint_map.clear();); } @@ -82,9 +89,10 @@ public: header.set_dst_epid(dst_epid); // Acquire send buffer and send the packet std::lock_guard lock(_send_mutex); - auto send_buff = _xport.send->get_send_buff(timeout); - _send_pkt->refresh(send_buff->cast(), header, payload); - send_buff->commit(header.get_length()); + auto send_buff = _xport->get_send_buff(timeout * 1000); + _send_pkt->refresh(send_buff->data(), header, payload); + send_buff->set_packet_size(header.get_length()); + _xport->release_send_buff(std::move(send_buff)); }; if (_endpoint_map.find(key) == _endpoint_map.end()) { @@ -118,11 +126,14 @@ private: // - Route them based on the dst_port // - Pass them to the ctrlport_endpoint for additional processing while (not _stop_recv_thread) { - auto buff = _xport.recv->get_recv_buff(0.0); + // FIXME Move lock back once have threaded_io_service + std::unique_lock lock(_mutex); + auto buff = _xport->get_recv_buff(0); if (buff) { - std::lock_guard lock(_mutex); + // FIXME Move lock back to here once have threaded_io_service + // std::lock_guard lock(_mutex); try { - _recv_pkt->refresh(buff->cast()); + _recv_pkt->refresh(buff->data()); const ctrl_payload payload = _recv_pkt->get_payload(); ep_map_key_t key{payload.src_epid, payload.dst_port}; if (_endpoint_map.find(key) != _endpoint_map.end()) { @@ -131,7 +142,10 @@ private: } catch (...) { // Ignore all errors } + _xport->release_recv_buff(std::move(buff)); } else { + // FIXME Move lock back to lock_guard once have threaded_io_service + lock.unlock(); // Be a good citizen and yield if no packet is processed static const size_t MIN_DUR = 1; boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR)); @@ -154,7 +168,7 @@ private: // The endpoint ID of this software endpoint const sep_id_t _my_epid; // Send/recv transports - const chdr_ctrl_xport_t _xport; + chdr_ctrl_xport::sptr _xport; // The curent sequence number for a send packet size_t _send_seqnum = 0; // The number of packets dropped @@ -173,7 +187,7 @@ private: std::mutex _send_mutex; }; -chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const chdr_ctrl_xport_t& xport, +chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(chdr_ctrl_xport::sptr xport, const chdr::chdr_packet_factory& pkt_factory, sep_id_t my_epid) { diff --git a/host/lib/rfnoc/link_stream_manager.cpp b/host/lib/rfnoc/link_stream_manager.cpp index bd330e313..4fe183529 100644 --- a/host/lib/rfnoc/link_stream_manager.cpp +++ b/host/lib/rfnoc/link_stream_manager.cpp @@ -63,31 +63,10 @@ public: // chdr_ctrl_endpoint. We have to use the same base transport here to ensure that // the route setup logic in the FPGA transport works correctly. // TODO: This needs to be cleaned up. A muxed_zero_copy_if is excessive here - chdr_ctrl_xport_t base_xport = - _mb_iface.make_ctrl_transport(_my_device_id, _my_mgmt_ctrl_epid); - UHD_ASSERT_THROW(base_xport.send.get() == base_xport.recv.get()) - - auto classify_fn = [&pkt_factory](void* buff, size_t) -> uint32_t { - if (buff) { - chdr_packet::cuptr pkt = pkt_factory.make_generic(); - pkt->refresh(buff); - return (pkt->get_chdr_header().get_pkt_type() == PKT_TYPE_MGMT) ? 0 : 1; - } else { - throw uhd::assertion_error("null pointer"); - } - }; - _muxed_xport = muxed_zero_copy_if::make(base_xport.send, classify_fn, 2); - - // Create child transports - chdr_ctrl_xport_t mgmt_xport = base_xport; - mgmt_xport.send = _muxed_xport->make_stream(0); - mgmt_xport.recv = mgmt_xport.send; - _ctrl_xport = base_xport; - _ctrl_xport.send = _muxed_xport->make_stream(1); - _ctrl_xport.recv = _ctrl_xport.send; + _ctrl_xport = _mb_iface.make_ctrl_transport(_my_device_id, _my_mgmt_ctrl_epid); // Create management portal using one of the child transports - _mgmt_portal = mgmt_portal::make(mgmt_xport, + _mgmt_portal = mgmt_portal::make(*_ctrl_xport, _pkt_factory, sep_addr_t(_my_device_id, SEP_INST_MGMT_CTRL), _my_mgmt_ctrl_epid); @@ -131,8 +110,8 @@ public: } // Setup a route to the EPID - _mgmt_portal->initialize_endpoint(dst_addr, dst_epid); - _mgmt_portal->setup_local_route(dst_epid); + _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid); + _mgmt_portal->setup_local_route(*_ctrl_xport, dst_epid); if (!_mgmt_portal->get_endpoint_info(dst_epid).has_ctrl) { throw uhd::rfnoc_error( "Downstream endpoint does not support control traffic"); @@ -157,10 +136,10 @@ public: sep_id_t src_epid = _epid_alloc->allocate_epid(src_addr); // Initialize endpoints - _mgmt_portal->initialize_endpoint(dst_addr, dst_epid); - _mgmt_portal->initialize_endpoint(src_addr, src_epid); + _mgmt_portal->initialize_endpoint(*_ctrl_xport, dst_addr, dst_epid); + _mgmt_portal->initialize_endpoint(*_ctrl_xport, src_addr, src_epid); - _mgmt_portal->setup_remote_route(dst_epid, src_epid); + _mgmt_portal->setup_remote_route(*_ctrl_xport, dst_epid, src_epid); return sep_id_pair_t(src_epid, dst_epid); } @@ -214,13 +193,15 @@ public: // EPIDs) // Setup a stream - stream_buff_params_t buff_params = _mgmt_portal->config_remote_stream(dst_epid, - src_epid, - lossy_xport, - stream_buff_params_t{1, 1}, // Dummy frequency - stream_buff_params_t{0, 0}, // Dummy headroom - false, - STREAM_SETUP_TIMEOUT); + stream_buff_params_t buff_params = + _mgmt_portal->config_remote_stream(*_ctrl_xport, + dst_epid, + src_epid, + lossy_xport, + stream_buff_params_t{1, 1}, // Dummy frequency + stream_buff_params_t{0, 0}, // Dummy headroom + false, + STREAM_SETUP_TIMEOUT); // Compute FC frequency and headroom based on buff parameters stream_buff_params_t fc_freq{ @@ -234,7 +215,8 @@ public: std::ceil(double(buff_params.packets) * fc_headroom_ratio))}; // Reconfigure flow control using the new frequency and headroom - return _mgmt_portal->config_remote_stream(dst_epid, + return _mgmt_portal->config_remote_stream(*_ctrl_xport, + dst_epid, src_epid, lossy_xport, fc_freq, @@ -243,7 +225,8 @@ public: STREAM_SETUP_TIMEOUT); } - virtual chdr_data_xport_t create_host_to_device_data_stream(const sep_addr_t dst_addr, + virtual chdr_tx_data_xport::uptr create_host_to_device_data_stream( + const sep_addr_t dst_addr, const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, @@ -261,21 +244,22 @@ public: sep_id_t dst_epid = _epid_alloc->allocate_epid(dst_addr); // Create the data transport that we will return to the client - chdr_data_xport_t xport = - _mb_iface.make_data_transport(_my_device_id, src_epid, xport_args); - xport.src_epid = src_epid; - xport.dst_epid = dst_epid; + chdr_rx_data_xport::uptr xport = _mb_iface.make_rx_data_transport( + _my_device_id, src_epid, dst_epid, xport_args); + + chdr_ctrl_xport::sptr mgmt_xport = + _mb_iface.make_ctrl_transport(_my_device_id, src_epid); // Create new temporary management portal with the transports used for this stream // TODO: This is a bit excessive. Maybe we can pair down the functionality of the // portal just for route setup purposes. Whatever we do, we *must* use xport in it // though otherwise the transport will not behave correctly. mgmt_portal::uptr data_mgmt_portal = - mgmt_portal::make(xport, _pkt_factory, sw_epid_addr, src_epid); + mgmt_portal::make(*mgmt_xport, _pkt_factory, sw_epid_addr, src_epid); // Setup a route to the EPID - data_mgmt_portal->initialize_endpoint(dst_addr, dst_epid); - data_mgmt_portal->setup_local_route(dst_epid); + data_mgmt_portal->initialize_endpoint(*mgmt_xport, dst_addr, dst_epid); + data_mgmt_portal->setup_local_route(*mgmt_xport, dst_epid); if (!_mgmt_portal->get_endpoint_info(dst_epid).has_data) { throw uhd::rfnoc_error("Downstream endpoint does not support data traffic"); } @@ -288,7 +272,8 @@ public: return xport; } - virtual chdr_data_xport_t create_device_to_host_data_stream(const sep_addr_t src_addr, + virtual chdr_tx_data_xport::uptr create_device_to_host_data_stream( + const sep_addr_t src_addr, const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, @@ -297,6 +282,7 @@ public: const device_addr_t& xport_args) { // TODO: Implement me + return chdr_tx_data_xport::uptr(); } private: @@ -323,8 +309,7 @@ private: // The software EPID for all management and control traffic sep_id_t _my_mgmt_ctrl_epid; // Transports - muxed_zero_copy_if::sptr _muxed_xport; - chdr_ctrl_xport_t _ctrl_xport; + chdr_ctrl_xport::sptr _ctrl_xport; // Management portal for control endpoints mgmt_portal::uptr _mgmt_portal; // The CHDR control endpoint diff --git a/host/lib/rfnoc/mgmt_portal.cpp b/host/lib/rfnoc/mgmt_portal.cpp index 79e297407..b490e0baf 100644 --- a/host/lib/rfnoc/mgmt_portal.cpp +++ b/host/lib/rfnoc/mgmt_portal.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -174,7 +175,7 @@ mgmt_portal::~mgmt_portal() {} class mgmt_portal_impl : public mgmt_portal { public: - mgmt_portal_impl(const chdr_ctrl_xport_t& xport, + mgmt_portal_impl(chdr_ctrl_xport& xport, const chdr::chdr_packet_factory& pkt_factory, sep_addr_t my_sep_addr, sep_id_t my_epid) @@ -183,14 +184,12 @@ public: , _chdr_w(pkt_factory.get_chdr_w()) , _endianness(pkt_factory.get_endianness()) , _my_node_id(my_sep_addr.first, NODE_TYPE_STRM_EP, my_epid) - , _recv_xport(xport.recv) - , _send_xport(xport.send) , _send_seqnum(0) , _send_pkt(std::move(pkt_factory.make_mgmt())) , _recv_pkt(std::move(pkt_factory.make_mgmt())) { std::lock_guard lock(_mutex); - _discover_topology(); + _discover_topology(xport); UHD_LOG_DEBUG("RFNOC::MGMT", "The following endpoints are reachable from " << _my_node_id.to_string()); for (const auto& ep : _discovered_ep_set) { @@ -205,7 +204,8 @@ public: return _discovered_ep_set; } - virtual void initialize_endpoint(const sep_addr_t& addr, const sep_id_t& epid) + virtual void initialize_endpoint( + chdr_ctrl_xport& xport, const sep_addr_t& addr, const sep_id_t& epid) { std::lock_guard lock(_mutex); @@ -232,7 +232,7 @@ public: // Send the transaction and receive a response. // We don't care about the contents of the response. - _send_recv_mgmt_transaction(cfg_xact); + _send_recv_mgmt_transaction(xport, cfg_xact); // Add/update the entry in the stream endpoint ID map _epid_addr_map[epid] = addr; @@ -280,7 +280,7 @@ public: return retval; } - virtual void setup_local_route(const sep_id_t& dst_epid) + virtual void setup_local_route(chdr_ctrl_xport& xport, const sep_id_t& dst_epid) { std::lock_guard lock(_mutex); @@ -339,7 +339,7 @@ public: cfg_xact.add_hop(discover_hop); // Send the transaction and validate that we saw a stream endpoint - const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(cfg_xact); + const mgmt_payload sep_info_xact = _send_recv_mgmt_transaction(xport, cfg_xact); const node_id_t sep_node = _pop_node_discovery_hop(sep_info_xact); if (sep_node.type != NODE_TYPE_STRM_EP) { throw uhd::routing_error( @@ -384,7 +384,8 @@ public: return false; } - virtual void setup_remote_route(const sep_id_t& dst_epid, const sep_id_t& src_epid) + virtual void setup_remote_route( + chdr_ctrl_xport& xport, const sep_id_t& dst_epid, const sep_id_t& src_epid) { std::lock_guard lock(_mutex); @@ -412,8 +413,8 @@ public: // there is a need to optimize for routing table fullness, we can do a software // graph traversal here, find the closest common parent (crossbar) for the two // nodes and only configure the nodes downstream of that. - setup_local_route(dst_epid); - setup_local_route(src_epid); + setup_local_route(xport, dst_epid); + setup_local_route(xport, src_epid); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format( @@ -421,7 +422,8 @@ public: % src_epid % dst_epid)); } - virtual void config_local_rx_stream_start(const sep_id_t& epid, + virtual void config_local_rx_stream_start(chdr_ctrl_xport& xport, + const sep_id_t& epid, const bool lossy_xport, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, @@ -464,29 +466,30 @@ public: // Send the transaction and receive a response. // We don't care about the contents of the response. cfg_xact.add_hop(cfg_hop); - _send_recv_mgmt_transaction(cfg_xact); + _send_recv_mgmt_transaction(xport, cfg_xact); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format("Initiated RX stream setup for EPID=%d") % epid)); } virtual stream_buff_params_t config_local_rx_stream_commit( - const sep_id_t& epid, const double timeout = 0.2) + chdr_ctrl_xport& xport, const sep_id_t& epid, const double timeout = 0.2) { std::lock_guard lock(_mutex); // Wait for stream configuration to finish on the HW side const node_addr_t& node_addr = _lookup_sep_node_addr(epid); - _validate_stream_setup(node_addr, timeout); + _validate_stream_setup(xport, node_addr, timeout); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format("Finished RX stream setup for EPID=%d") % epid)); // Return discovered buffer parameters - return std::get<1>(_get_ostrm_status(node_addr)); + return std::get<1>(_get_ostrm_status(xport, node_addr)); } - virtual void config_local_tx_stream(const sep_id_t& epid, + virtual void config_local_tx_stream(chdr_ctrl_xport& xport, + const sep_id_t& epid, const sw_buff_t pyld_buff_fmt, const sw_buff_t mdata_buff_fmt, const bool reset = false) @@ -494,7 +497,7 @@ public: std::lock_guard lock(_mutex); // First setup a route between to the endpoint - setup_local_route(epid); + setup_local_route(xport, epid); const node_addr_t& node_addr = _lookup_sep_node_addr(epid); @@ -522,13 +525,14 @@ public: // Send the transaction and receive a response. // We don't care about the contents of the response. - _send_recv_mgmt_transaction(cfg_xact); + _send_recv_mgmt_transaction(xport, cfg_xact); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format("Finished TX stream setup for EPID=%d") % epid)); } - virtual stream_buff_params_t config_remote_stream(const sep_id_t& dst_epid, + virtual stream_buff_params_t config_remote_stream(chdr_ctrl_xport& xport, + const sep_id_t& dst_epid, const sep_id_t& src_epid, const bool lossy_xport, const stream_buff_params_t& fc_freq, @@ -539,7 +543,7 @@ public: std::lock_guard lock(_mutex); // First setup a route between the two endpoints - setup_remote_route(dst_epid, src_epid); + setup_remote_route(xport, dst_epid, src_epid); const node_addr_t& dst_node_addr = _lookup_sep_node_addr(dst_epid); const node_addr_t& src_node_addr = _lookup_sep_node_addr(src_epid); @@ -557,7 +561,7 @@ public: (i == 0) ? RESET_AND_FLUSH_OSTRM : RESET_AND_FLUSH_ISTRM))); rst_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_RETURN)); rst_xact.add_hop(rst_hop); - _send_recv_mgmt_transaction(rst_xact); + _send_recv_mgmt_transaction(xport, rst_xact); } } @@ -579,18 +583,18 @@ public: // Send the transaction and receive a response. // We don't care about the contents of the response. cfg_xact.add_hop(cfg_hop); - _send_recv_mgmt_transaction(cfg_xact); + _send_recv_mgmt_transaction(xport, cfg_xact); } // Wait for stream configuration to finish on the HW side - _validate_stream_setup(src_node_addr, timeout); + _validate_stream_setup(xport, src_node_addr, timeout); UHD_LOG_DEBUG("RFNOC::MGMT", (boost::format("Setup a stream from EPID=%d to EPID=%d") % src_epid % dst_epid)); // Return discovered buffer parameters - return std::get<1>(_get_ostrm_status(src_node_addr)); + return std::get<1>(_get_ostrm_status(xport, src_node_addr)); } @@ -605,7 +609,7 @@ public: private: // Functions // Discover all nodes that are reachable from this software stream endpoint - void _discover_topology() + void _discover_topology(chdr_ctrl_xport& xport) { // Initialize a queue of pending paths. We will use this for a breadth-first // traversal of the dataflow graph. The queue consists of a previously discovered @@ -652,7 +656,7 @@ private: // Functions try { // Send the discovery transaction const mgmt_payload disc_resp_xact = - _send_recv_mgmt_transaction(disc_req_xact); + _send_recv_mgmt_transaction(xport, disc_req_xact); new_node = _pop_node_discovery_hop(disc_resp_xact); } catch (uhd::io_error& io_err) { // We received an IO error. This could happen if we have a legitimate @@ -695,7 +699,7 @@ private: // Functions mgmt_payload init_req_xact(route_xact); _push_node_init_hop(init_req_xact, new_node); const mgmt_payload init_resp_xact = - _send_recv_mgmt_transaction(init_req_xact); + _send_recv_mgmt_transaction(xport, init_req_xact); UHD_LOG_DEBUG("RFNOC::MGMT", "Initialized node " << new_node.to_string()); // If the new node is a stream endpoint then we are done traversing this @@ -823,7 +827,7 @@ private: // Functions // Send/recv a management transaction that will get the output stream status std::tuple _get_ostrm_status( - const node_addr_t& node_addr) + chdr_ctrl_xport& xport, const node_addr_t& node_addr) { // Build a management transaction to first get to the node mgmt_payload status_xact; @@ -844,7 +848,7 @@ private: // Functions status_xact.add_hop(cfg_hop); // Send the transaction, receive a response and validate it - const mgmt_payload resp_xact = _send_recv_mgmt_transaction(status_xact); + const mgmt_payload resp_xact = _send_recv_mgmt_transaction(xport, status_xact); if (resp_xact.get_num_hops() != 1) { throw uhd::op_failed("Management operation failed. Incorrect format (hops)."); } @@ -875,13 +879,14 @@ private: // Functions } // Make sure that stream setup is complete and successful, else throw exception - void _validate_stream_setup(const node_addr_t& node_addr, const double timeout) + void _validate_stream_setup( + chdr_ctrl_xport& xport, const node_addr_t& node_addr, const double timeout) { // Get the status of the output stream uint32_t ostrm_status = 0; double sleep_s = 0.05; for (size_t i = 0; i < size_t(std::ceil(timeout / sleep_s)); i++) { - ostrm_status = std::get<0>(_get_ostrm_status(node_addr)); + ostrm_status = std::get<0>(_get_ostrm_status(xport, node_addr)); if ((ostrm_status & STRM_STATUS_SETUP_PENDING) != 0) { // Wait and retry std::chrono::milliseconds(static_cast(sleep_s * 1000)); @@ -975,7 +980,8 @@ private: // Functions } // Send the specified management transaction to the device - void _send_mgmt_transaction(const mgmt_payload& payload, double timeout = 0.1) + void _send_mgmt_transaction( + chdr_ctrl_xport& xport, const mgmt_payload& payload, double timeout = 0.1) { chdr_header header; header.set_pkt_type(PKT_TYPE_MGMT); @@ -984,18 +990,19 @@ private: // Functions header.set_length(payload.get_size_bytes() + (chdr_w_to_bits(_chdr_w) / 8)); header.set_dst_epid(0); - managed_send_buffer::sptr send_buff = _send_xport->get_send_buff(timeout); + auto send_buff = xport.get_send_buff(timeout * 1000); if (not send_buff) { UHD_LOG_ERROR("RFNOC::MGMT", "Timed out getting send buff for management transaction"); throw uhd::io_error("Timed out getting send buff for management transaction"); } - _send_pkt->refresh(send_buff->cast(), header, payload); - send_buff->commit(header.get_length()); + _send_pkt->refresh(send_buff->data(), header, payload); + send_buff->set_packet_size(header.get_length()); + xport.release_send_buff(std::move(send_buff)); } // Send the specified management transaction to the device and receive a response const mgmt_payload _send_recv_mgmt_transaction( - const mgmt_payload& transaction, double timeout = 0.1) + chdr_ctrl_xport& xport, const mgmt_payload& transaction, double timeout = 0.1) { mgmt_payload send(transaction); send.set_header(_my_epid, _protover, _chdr_w); @@ -1005,17 +1012,18 @@ private: // Functions nop_hop.add_op(mgmt_op_t(mgmt_op_t::MGMT_OP_NOP)); send.add_hop(nop_hop); // Send the transaction over the wire - _send_mgmt_transaction(send); + _send_mgmt_transaction(xport, send); - managed_recv_buffer::sptr recv_buff = _recv_xport->get_recv_buff(timeout); + auto recv_buff = xport.get_mgmt_buff(timeout * 1000); if (not recv_buff) { throw uhd::io_error("Timed out getting recv buff for management transaction"); } - _recv_pkt->refresh(recv_buff->cast()); + _recv_pkt->refresh(recv_buff->data()); mgmt_payload recv; recv.set_header(_my_epid, _protover, _chdr_w); _recv_pkt->fill_payload(recv); - return std::move(recv); + xport.release_recv_buff(std::move(recv_buff)); + return recv; } private: // Members @@ -1039,8 +1047,6 @@ private: // Members // endpoint std::map _epid_addr_map; // Send/recv transports - uhd::transport::zero_copy_if::sptr _recv_xport; - uhd::transport::zero_copy_if::sptr _send_xport; size_t _send_seqnum; // Management packet containers chdr_mgmt_packet::uptr _send_pkt; @@ -1053,7 +1059,7 @@ private: // Members }; // namespace mgmt -mgmt_portal::uptr mgmt_portal::make(const chdr_ctrl_xport_t& xport, +mgmt_portal::uptr mgmt_portal::make(chdr_ctrl_xport& xport, const chdr::chdr_packet_factory& pkt_factory, sep_addr_t my_sep_addr, sep_id_t my_epid) -- cgit v1.2.3