diff options
Diffstat (limited to 'host/lib/usrp/device3')
| -rw-r--r-- | host/lib/usrp/device3/device3_flow_ctrl.hpp | 259 | ||||
| -rw-r--r-- | host/lib/usrp/device3/device3_impl.cpp | 173 | ||||
| -rw-r--r-- | host/lib/usrp/device3/device3_impl.hpp | 154 | ||||
| -rw-r--r-- | host/lib/usrp/device3/device3_io_impl.cpp | 579 | 
4 files changed, 562 insertions, 603 deletions
| diff --git a/host/lib/usrp/device3/device3_flow_ctrl.hpp b/host/lib/usrp/device3/device3_flow_ctrl.hpp index 50081543a..535d7fbac 100644 --- a/host/lib/usrp/device3/device3_flow_ctrl.hpp +++ b/host/lib/usrp/device3/device3_flow_ctrl.hpp @@ -8,10 +8,10 @@  #define INCLUDED_DEVICE3_FLOW_CTRL_HPP  #include "device3_impl.hpp" -#include <uhd/utils/log.hpp> -#include <uhd/types/sid.hpp> -#include <uhd/transport/zero_copy.hpp>  #include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <uhd/types/sid.hpp> +#include <uhd/utils/log.hpp>  #include <boost/shared_ptr.hpp>  namespace uhd { namespace usrp { @@ -19,12 +19,14 @@ namespace uhd { namespace usrp {  //! Stores the state of RX flow control  struct rx_fc_cache_t  { -    rx_fc_cache_t(): -        interval(0), -        last_byte_count(0), -        total_bytes_consumed(0), -        total_packets_consumed(0), -        seq_num(0) {} +    rx_fc_cache_t() +        : interval(0) +        , last_byte_count(0) +        , total_bytes_consumed(0) +        , total_packets_consumed(0) +        , seq_num(0) +    { +    }      //! Flow control interval in bytes      size_t interval; @@ -40,117 +42,112 @@ struct rx_fc_cache_t      uhd::transport::zero_copy_if::sptr xport;      std::function<uint32_t(uint32_t)> to_host;      std::function<uint32_t(uint32_t)> from_host; -    std::function<void(const uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> unpack; -    std::function<void(uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> pack; +    std::function<void( +        const uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> +        unpack; +    std::function<void(uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> +        pack;  };  /*! Send out RX flow control packets. -* -* This function handles updating the counters for the consumed -* bytes and packets, determines if a flow control message is -* is necessary, and sends one if it is.  Passing a nullptr for -* the buff parameter will skip the counter update. -* -* \param fc_cache RX flow control state information -* \param buff Receive buffer.  Setting to nullptr will -*             skip the counter update. -*/ + * + * This function handles updating the counters for the consumed + * bytes and packets, determines if a flow control message is + * is necessary, and sends one if it is.  Passing a nullptr for + * the buff parameter will skip the counter update. + * + * \param fc_cache RX flow control state information + * \param buff Receive buffer.  Setting to nullptr will + *             skip the counter update. + */  inline bool rx_flow_ctrl( -    boost::shared_ptr<rx_fc_cache_t> fc_cache, -    uhd::transport::managed_buffer::sptr buff -) { +    boost::shared_ptr<rx_fc_cache_t> fc_cache, uhd::transport::managed_buffer::sptr buff) +{      // If the caller supplied a buffer -    if (buff) -    { +    if (buff) {          // Unpack the header          uhd::transport::vrt::if_packet_info_t packet_info; -        packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); -        const uint32_t *pkt = buff->cast<const uint32_t *>(); +        packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); +        const uint32_t* pkt            = buff->cast<const uint32_t*>();          try {              fc_cache->unpack(pkt, packet_info); -        } -        catch(const std::exception &ex) -        { +        } catch (const std::exception& ex) {              // Log and ignore -            UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl; +            UHD_LOGGER_ERROR("RX FLOW CTRL") +                << "Error unpacking packet: " << ex.what() << std::endl;              return true;          }          // Update counters assuming the buffer is a consumed packet -        if (not packet_info.error) -        { +        if (not packet_info.error) {              fc_cache->total_bytes_consumed += buff->size();              fc_cache->total_packets_consumed++;          }      }      // Just return if there is no need to send a flow control packet -    if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) -    { +    if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) {          return true;      }      // Time to send a flow control packet      // Get a send buffer -    uhd::transport::managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); +    uhd::transport::managed_send_buffer::sptr fc_buff = +        fc_cache->xport->get_send_buff(0.0);      if (not fc_buff) {          throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer");      } -    uint32_t *pkt = fc_buff->cast<uint32_t *>(); +    uint32_t* pkt = fc_buff->cast<uint32_t*>(); -    //load packet info +    // load packet info      uhd::transport::vrt::if_packet_info_t packet_info;      packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC;      packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32; -    packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); -    packet_info.packet_count = fc_cache->seq_num++; -    packet_info.sob = false; -    packet_info.eob = false; -    packet_info.error = false; -    packet_info.fc_ack = false; -    packet_info.sid = fc_cache->sid.get(); -    packet_info.has_sid = true; -    packet_info.has_cid = false; -    packet_info.has_tsi = false; -    packet_info.has_tsf = false; -    packet_info.has_tlr = false; +    packet_info.num_payload_bytes   = packet_info.num_payload_words32 * sizeof(uint32_t); +    packet_info.packet_count        = fc_cache->seq_num++; +    packet_info.sob                 = false; +    packet_info.eob                 = false; +    packet_info.error               = false; +    packet_info.fc_ack              = false; +    packet_info.sid                 = fc_cache->sid.get(); +    packet_info.has_sid             = true; +    packet_info.has_cid             = false; +    packet_info.has_tsi             = false; +    packet_info.has_tsf             = false; +    packet_info.has_tlr             = false;      // Load Header:      fc_cache->pack(pkt, packet_info);      // Load Payload: Packet count, and byte count -    pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = +    pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] =          fc_cache->from_host(fc_cache->total_packets_consumed); -    pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = +    pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] =          fc_cache->from_host(fc_cache->total_bytes_consumed); -    //send the buffer over the interface -    fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); +    // send the buffer over the interface +    fc_buff->commit(sizeof(uint32_t) * (packet_info.num_packet_words32)); -    //update byte count +    // update byte count      fc_cache->last_byte_count = fc_cache->total_bytes_consumed;      return true;  }  /*! Handle RX flow control ACK packets. -* -*/ + * + */  inline void handle_rx_flowctrl_ack( -    boost::shared_ptr<rx_fc_cache_t> fc_cache, -    const uint32_t *payload -) { -    const uint32_t pkt_count = fc_cache->to_host(payload[0]); +    boost::shared_ptr<rx_fc_cache_t> fc_cache, const uint32_t* payload) +{ +    const uint32_t pkt_count  = fc_cache->to_host(payload[0]);      const uint32_t byte_count = fc_cache->to_host(payload[1]); -    if (fc_cache->total_bytes_consumed != byte_count) -    { +    if (fc_cache->total_bytes_consumed != byte_count) {          UHD_LOGGER_DEBUG("device3")              << "oh noes: byte_count==" << byte_count -            << "  total_bytes_consumed==" << fc_cache->total_bytes_consumed -            << std::hex << " sid==" << fc_cache->sid << std::dec -            << std::endl -        ; +            << "  total_bytes_consumed==" << fc_cache->total_bytes_consumed << std::hex +            << " sid==" << fc_cache->sid << std::dec << std::endl;      } -    fc_cache->total_bytes_consumed = byte_count; +    fc_cache->total_bytes_consumed   = byte_count;      fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too?      // This will send a flow control packet if there is a significant discrepancy @@ -160,14 +157,16 @@ inline void handle_rx_flowctrl_ack(  //! Stores the state of TX flow control  struct tx_fc_cache_t  { -    tx_fc_cache_t(uint32_t capacity): -        last_byte_ack(0), -        last_seq_ack(0), -        byte_count(0), -        pkt_count(0), -        window_size(capacity), -        fc_ack_seqnum(0), -        fc_received(false) {} +    tx_fc_cache_t(uint32_t capacity) +        : last_byte_ack(0) +        , last_seq_ack(0) +        , byte_count(0) +        , pkt_count(0) +        , window_size(capacity) +        , fc_ack_seqnum(0) +        , fc_received(false) +    { +    }      uint32_t last_byte_ack;      uint32_t last_seq_ack; @@ -178,26 +177,28 @@ struct tx_fc_cache_t      bool fc_received;      std::function<uint32_t(uint32_t)> to_host;      std::function<uint32_t(uint32_t)> from_host; -    std::function<void(const uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> unpack; -    std::function<void(uint32_t *packet_buff, uhd::transport::vrt::if_packet_info_t &)> pack; +    std::function<void( +        const uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> +        unpack; +    std::function<void(uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> +        pack;  }; -inline bool tx_flow_ctrl( -    boost::shared_ptr<tx_fc_cache_t> fc_cache, +inline bool tx_flow_ctrl(boost::shared_ptr<tx_fc_cache_t> fc_cache,      uhd::transport::zero_copy_if::sptr xport, -    uhd::transport::managed_buffer::sptr buff -) { -    while (true) -    { +    uhd::transport::managed_buffer::sptr buff) +{ +    while (true) {          // If there is space -        if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size()) -        { +        if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) +            >= buff->size()) {              // All is good - packet will be sent              fc_cache->byte_count += buff->size();              // Round up to nearest word -            if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) -            { -                fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); +            if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) { +                fc_cache->byte_count += +                    uhd::usrp::DEVICE3_LINE_SIZE +                    - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE);              }              fc_cache->pkt_count++;              return true; @@ -205,33 +206,33 @@ inline bool tx_flow_ctrl(          // Look for a flow control message to update the space available in the buffer.          uhd::transport::managed_recv_buffer::sptr buff = xport->get_recv_buff(0.1); -        if (buff) -        { +        if (buff) {              uhd::transport::vrt::if_packet_info_t if_packet_info; -            if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); -            const uint32_t *packet_buff = buff->cast<const uint32_t *>(); +            if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); +            const uint32_t* packet_buff       = buff->cast<const uint32_t*>();              try {                  fc_cache->unpack(packet_buff, if_packet_info); -            } -            catch(const std::exception &ex) -            { -                UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; +            } catch (const std::exception& ex) { +                UHD_LOGGER_ERROR("TX FLOW CTRL") +                    << "Error unpacking flow control packet: " << ex.what() << std::endl;                  continue;              } -            if (if_packet_info.packet_type != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC) -            { -                UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl; +            if (if_packet_info.packet_type +                != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC) { +                UHD_LOGGER_ERROR("TX FLOW CTRL") +                    << "Unexpected packet received by flow control handler: " +                    << if_packet_info.packet_type << std::endl;                  continue;              } -            const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; -            const uint32_t pkt_count = fc_cache->to_host(payload[0]); +            const uint32_t* payload   = &packet_buff[if_packet_info.num_header_words32]; +            const uint32_t pkt_count  = fc_cache->to_host(payload[0]);              const uint32_t byte_count = fc_cache->to_host(payload[1]);              // update the amount of space              fc_cache->last_byte_ack = byte_count; -            fc_cache->last_seq_ack = pkt_count; +            fc_cache->last_seq_ack  = pkt_count;              fc_cache->fc_received = true;          } @@ -239,13 +240,11 @@ inline bool tx_flow_ctrl(      return false;  } -inline void tx_flow_ctrl_ack( -    boost::shared_ptr<tx_fc_cache_t> fc_cache, +inline void tx_flow_ctrl_ack(boost::shared_ptr<tx_fc_cache_t> fc_cache,      uhd::transport::zero_copy_if::sptr send_xport, -    uhd::sid_t send_sid -) { -    if (not fc_cache->fc_received) -    { +    uhd::sid_t send_sid) +{ +    if (not fc_cache->fc_received) {          return;      } @@ -256,42 +255,42 @@ inline void tx_flow_ctrl_ack(          UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer";          return;      } -    uint32_t *pkt = fc_buff->cast<uint32_t *>(); +    uint32_t* pkt = fc_buff->cast<uint32_t*>();      // Load packet info      uhd::transport::vrt::if_packet_info_t packet_info;      packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_ACK;      packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32; -    packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); -    packet_info.packet_count = fc_cache->fc_ack_seqnum++; -    packet_info.sob = false; -    packet_info.eob = true; -    packet_info.error = false; -    packet_info.fc_ack = false; -    packet_info.sid = send_sid.get(); -    packet_info.has_sid = true; -    packet_info.has_cid = false; -    packet_info.has_tsi = false; -    packet_info.has_tsf = false; -    packet_info.has_tlr = false; +    packet_info.num_payload_bytes   = packet_info.num_payload_words32 * sizeof(uint32_t); +    packet_info.packet_count        = fc_cache->fc_ack_seqnum++; +    packet_info.sob                 = false; +    packet_info.eob                 = true; +    packet_info.error               = false; +    packet_info.fc_ack              = false; +    packet_info.sid                 = send_sid.get(); +    packet_info.has_sid             = true; +    packet_info.has_cid             = false; +    packet_info.has_tsi             = false; +    packet_info.has_tsf             = false; +    packet_info.has_tlr             = false;      // Load Header:      fc_cache->pack(pkt, packet_info);      // Update counters to include this packet -    size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); +    size_t fc_ack_pkt_size = sizeof(uint32_t) * (packet_info.num_packet_words32);      fc_cache->byte_count += fc_ack_pkt_size;      // Round up to nearest word -    if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) -    { -        fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); +    if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) { +        fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE +                                - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE);      }      fc_cache->pkt_count++;      // Load Payload: Packet count, and byte count -    pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = +    pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] =          fc_cache->from_host(fc_cache->pkt_count); -    pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = +    pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] =          fc_cache->from_host(fc_cache->byte_count);      // Send the buffer over the interface @@ -301,6 +300,6 @@ inline void tx_flow_ctrl_ack(      fc_cache->fc_received = false;  } -}}; +}}; // namespace uhd::usrp  #endif /* INCLUDED_DEVICE3_FLOW_CTRL_HPP */ diff --git a/host/lib/usrp/device3/device3_impl.cpp b/host/lib/usrp/device3/device3_impl.cpp index 5705d6a84..d636b3338 100644 --- a/host/lib/usrp/device3/device3_impl.cpp +++ b/host/lib/usrp/device3/device3_impl.cpp @@ -6,10 +6,10 @@  //  #include "device3_impl.hpp" -#include <uhd/utils/log.hpp>  #include <uhd/rfnoc/block_ctrl_base.hpp> -#include <uhdlib/rfnoc/graph_impl.hpp> +#include <uhd/utils/log.hpp>  #include <uhdlib/rfnoc/ctrl_iface.hpp> +#include <uhdlib/rfnoc/graph_impl.hpp>  #include <boost/make_shared.hpp>  #include <algorithm> @@ -18,21 +18,20 @@ using namespace uhd::usrp;  device3_impl::device3_impl()  {      _type = uhd::device::USRP; -    _async_md.reset(new async_md_type(1000/*messages deep*/)); +    _async_md.reset(new async_md_type(1000 /*messages deep*/));      _tree = uhd::property_tree::make();  };  //! Returns true if the integer value stored in lhs is smaller than that in rhs -bool _compare_string_indexes(const std::string &lhs, const std::string &rhs) +bool _compare_string_indexes(const std::string& lhs, const std::string& rhs)  {      return boost::lexical_cast<size_t>(lhs) < boost::lexical_cast<size_t>(rhs);  } -void device3_impl::merge_channel_defs( -    const std::vector<uhd::rfnoc::block_id_t> &chan_ids, -    const std::vector<uhd::device_addr_t> &chan_args, -    const uhd::direction_t dir -) { +void device3_impl::merge_channel_defs(const std::vector<uhd::rfnoc::block_id_t>& chan_ids, +    const std::vector<uhd::device_addr_t>& chan_args, +    const uhd::direction_t dir) +{      UHD_ASSERT_THROW(chan_ids.size() == chan_args.size());      if (dir == uhd::DX_DIRECTION) {          merge_channel_defs(chan_ids, chan_args, RX_DIRECTION); @@ -40,7 +39,8 @@ void device3_impl::merge_channel_defs(          return;      } -    uhd::fs_path chans_root = uhd::fs_path("/channels/") / (dir == RX_DIRECTION ? "rx" : "tx"); +    uhd::fs_path chans_root = +        uhd::fs_path("/channels/") / (dir == RX_DIRECTION ? "rx" : "tx");      // Store the new positions of the channels:      std::vector<size_t> chan_idxs; @@ -54,18 +54,23 @@ void device3_impl::merge_channel_defs(      // 2. Cycle through existing channels to find out where to merge      //    the new channels. Rules are:      //    - The order of chan_ids must be preserved -    //    - All block indices that are in chan_ids may be overwritten in the channel definition +    //    - All block indices that are in chan_ids may be overwritten in the channel +    //    definition      //    - If the channels in chan_ids are not yet in the property tree channel list,      //      they are appended. -    for(const std::string &chan_idx:  curr_channels) { +    for (const std::string& chan_idx : curr_channels) {          if (_tree->exists(chans_root / chan_idx)) { -            rfnoc::block_id_t chan_block_id = _tree->access<rfnoc::block_id_t>(chans_root / chan_idx).get(); -            if (std::find(chan_ids.begin(), chan_ids.end(), chan_block_id) != chan_ids.end()) { +            rfnoc::block_id_t chan_block_id = +                _tree->access<rfnoc::block_id_t>(chans_root / chan_idx).get(); +            if (std::find(chan_ids.begin(), chan_ids.end(), chan_block_id) +                != chan_ids.end()) {                  chan_idxs.push_back(boost::lexical_cast<size_t>(chan_idx));              }          }      } -    size_t last_chan_idx = curr_channels.empty() ? 0 : (boost::lexical_cast<size_t>(curr_channels.back()) + 1); +    size_t last_chan_idx = curr_channels.empty() +                               ? 0 +                               : (boost::lexical_cast<size_t>(curr_channels.back()) + 1);      while (chan_idxs.size() < chan_ids.size()) {          chan_idxs.push_back(last_chan_idx);          last_chan_idx++; @@ -80,27 +85,28 @@ void device3_impl::merge_channel_defs(          if (not _tree->exists(chans_root / chan_idxs[i] / "args")) {              _tree->create<uhd::device_addr_t>(chans_root / chan_idxs[i] / "args");          } -        _tree->access<uhd::device_addr_t>(chans_root / chan_idxs[i] / "args").set(chan_args[i]); +        _tree->access<uhd::device_addr_t>(chans_root / chan_idxs[i] / "args") +            .set(chan_args[i]);      }  }  /***********************************************************************   * RFNoC-Specific   **********************************************************************/ -void device3_impl::enumerate_rfnoc_blocks( -        size_t device_index, -        size_t n_blocks, -        size_t base_port, -        const uhd::sid_t &base_sid, -        uhd::device_addr_t transport_args -) { +void device3_impl::enumerate_rfnoc_blocks(size_t device_index, +    size_t n_blocks, +    size_t base_port, +    const uhd::sid_t& base_sid, +    uhd::device_addr_t transport_args) +{      // entries that are already connected to this block      uhd::sid_t ctrl_sid = base_sid; -    uhd::property_tree::sptr subtree = _tree->subtree(uhd::fs_path("/mboards") / device_index); +    uhd::property_tree::sptr subtree = +        _tree->subtree(uhd::fs_path("/mboards") / device_index);      // 1) Clean property tree entries      // TODO put this back once radios are actual rfnoc blocks!!!!!! -    //if (subtree->exists("xbar")) { -        //subtree->remove("xbar"); +    // if (subtree->exists("xbar")) { +    // subtree->remove("xbar");      //}      // 2) Destroy existing block controllers      // TODO: Clear out all the old block control classes @@ -109,40 +115,27 @@ void device3_impl::enumerate_rfnoc_blocks(          // First, make a transport for port number zero, because we always need that:          ctrl_sid.set_dst_xbarport(base_port + i);          ctrl_sid.set_dst_blockport(0); -        both_xports_t xport = this->make_transport( -            ctrl_sid, -            CTRL, -            transport_args -        ); +        both_xports_t xport = this->make_transport(ctrl_sid, CTRL, transport_args);          UHD_LOG_TRACE("DEVICE3",              str(boost::format("Setting up NoC-Shell Control for port #0 (SID: %s)...") -                % xport.send_sid.to_pp_string_hex()) -        ); -        uhd::rfnoc::ctrl_iface::sptr ctrl = uhd::rfnoc::ctrl_iface::make( -            xport, -            str(boost::format("CE_%02d_Port_%02X") -                % i -                % ctrl_sid.get_dst_endpoint()) -        ); -        uint64_t noc_id = ctrl->send_cmd_pkt( -                uhd::rfnoc::SR_READBACK, -                uhd::rfnoc::SR_READBACK_REG_ID, -                true -        ); -        UHD_LOG_DEBUG("DEVICE3", str( -            boost::format("Port 0x%02X: Found NoC-Block with ID %016X.") -                % int(ctrl_sid.get_dst_endpoint()) -                % noc_id -        )); +                % xport.send_sid.to_pp_string_hex())); +        uhd::rfnoc::ctrl_iface::sptr ctrl = uhd::rfnoc::ctrl_iface::make(xport, +            str(boost::format("CE_%02d_Port_%02X") % i % ctrl_sid.get_dst_endpoint())); +        uint64_t noc_id                   = ctrl->send_cmd_pkt( +            uhd::rfnoc::SR_READBACK, uhd::rfnoc::SR_READBACK_REG_ID, true); +        UHD_LOG_DEBUG("DEVICE3", +            str(boost::format("Port 0x%02X: Found NoC-Block with ID %016X.") +                % int(ctrl_sid.get_dst_endpoint()) % noc_id));          uhd::rfnoc::make_args_t make_args; -        uhd::rfnoc::blockdef::sptr block_def = uhd::rfnoc::blockdef::make_from_noc_id(noc_id); +        uhd::rfnoc::blockdef::sptr block_def = +            uhd::rfnoc::blockdef::make_from_noc_id(noc_id);          if (not block_def) {              UHD_LOG_WARNING("DEVICE3",                  "No block definition found, using default block configuration " -                "for block with NOC ID: " + str(boost::format("0x%08X") % noc_id) -            ); -            block_def = uhd::rfnoc::blockdef::make_from_noc_id( -                uhd::rfnoc::DEFAULT_NOC_ID); +                "for block with NOC ID: " +                    + str(boost::format("0x%08X") % noc_id)); +            block_def = +                uhd::rfnoc::blockdef::make_from_noc_id(uhd::rfnoc::DEFAULT_NOC_ID);          }          UHD_ASSERT_THROW(block_def);          make_args.ctrl_ifaces[0] = ctrl; @@ -151,71 +144,59 @@ void device3_impl::enumerate_rfnoc_blocks(                  continue;              }              ctrl_sid.set_dst_blockport(port_number); -            both_xports_t xport1 = this->make_transport( -                ctrl_sid, -                CTRL, -                transport_args -            ); -            UHD_LOG_TRACE("DEVICE3", str( -                    boost::format("Setting up NoC-Shell Control for port #%d " +            both_xports_t xport1 = this->make_transport(ctrl_sid, CTRL, transport_args); +            UHD_LOG_TRACE("DEVICE3", +                str(boost::format("Setting up NoC-Shell Control for port #%d "                                    "(SID: %s)...") -                    % port_number -                    % xport1.send_sid.to_pp_string_hex() -            )); -            uhd::rfnoc::ctrl_iface::sptr ctrl1 = uhd::rfnoc::ctrl_iface::make( -                    xport1, -                    str(boost::format("CE_%02d_Port_%02X") % i % ctrl_sid.get_dst_endpoint()) -            ); +                    % port_number % xport1.send_sid.to_pp_string_hex())); +            uhd::rfnoc::ctrl_iface::sptr ctrl1 = uhd::rfnoc::ctrl_iface::make(xport1, +                str(boost::format("CE_%02d_Port_%02X") % i +                    % ctrl_sid.get_dst_endpoint()));              make_args.ctrl_ifaces[port_number] = ctrl1;          }          UHD_LOG_TRACE("DEVICE3", -            "All control transports successfully created for block with ID " << -            str(boost::format("0x%08X") % noc_id) -        ); +            "All control transports successfully created for block with ID " +                << str(boost::format("0x%08X") % noc_id));          make_args.base_address = xport.send_sid.get_dst();          make_args.device_index = device_index; -        make_args.tree = subtree; -        {   //Critical section for block_ctrl vector access +        make_args.tree         = subtree; +        { // Critical section for block_ctrl vector access              boost::lock_guard<boost::mutex> lock(_block_ctrl_mutex); -            _rfnoc_block_ctrl.push_back(uhd::rfnoc::block_ctrl_base::make(make_args, noc_id)); +            _rfnoc_block_ctrl.push_back( +                uhd::rfnoc::block_ctrl_base::make(make_args, noc_id));          }      }  } -uhd::rfnoc::graph::sptr device3_impl::create_graph(const std::string &name) +uhd::rfnoc::graph::sptr device3_impl::create_graph(const std::string& name)  {      // Create an async message handler -    UHD_LOGGER_TRACE("DEVICE3") << "Creating async message handler for graph `" << name << "'..."; -    // FIXME: right now this only can only handle source sid of 0 and xbar local addr of 2. -    // This is ok for now because that most of our device has xbard local addr hardcode to 2. +    UHD_LOGGER_TRACE("DEVICE3") +        << "Creating async message handler for graph `" << name << "'..."; +    // FIXME: right now this only can only handle source sid of 0 and xbar local addr +    // of 2. This is ok for now because that most of our device has xbard local addr +    // hardcode to 2.      sid_t async_sid(0);      async_sid.set_dst_addr(2); -    both_xports_t async_xports = make_transport( -            async_sid, -            ASYNC_MSG, -            //FIXME: only get rx_hints from mb index of 0 -            get_rx_hints(0) -    ); +    both_xports_t async_xports = make_transport(async_sid, +        ASYNC_MSG, +        // FIXME: only get rx_hints from mb index of 0 +        get_rx_hints(0));      UHD_LOGGER_TRACE("DEVICE3") << " Async transport ready." << std::endl;      uhd::rfnoc::async_msg_handler::sptr async_msg_handler = -        uhd::rfnoc::async_msg_handler::make( -                async_xports.recv, -                async_xports.send, -                async_xports.send_sid, -                async_xports.endianness -        ); -    UHD_LOGGER_TRACE("DEVICE3") << "Async message has address " << async_xports.send_sid << std::endl; +        uhd::rfnoc::async_msg_handler::make(async_xports.recv, +            async_xports.send, +            async_xports.send_sid, +            async_xports.endianness); +    UHD_LOGGER_TRACE("DEVICE3") +        << "Async message has address " << async_xports.send_sid << std::endl;      // Create the graph      UHD_LOGGER_TRACE("DEVICE3") << "Creating graph `" << name << "'..." << std::endl;      uhd::rfnoc::graph::sptr graph = boost::make_shared<uhd::rfnoc::graph_impl>( -            name, -            shared_from_this(), -            async_msg_handler -    ); +        name, shared_from_this(), async_msg_handler);      return graph;  } - diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp index e82597b9b..3bf6f6111 100644 --- a/host/lib/usrp/device3/device3_impl.hpp +++ b/host/lib/usrp/device3/device3_impl.hpp @@ -11,20 +11,20 @@  #ifndef INCLUDED_DEVICE3_IMPL_HPP  #define INCLUDED_DEVICE3_IMPL_HPP +#include "../../transport/super_recv_packet_handler.hpp" +#include "../../transport/super_send_packet_handler.hpp" +#include <uhd/device3.hpp>  #include <uhd/transport/bounded_buffer.hpp> -#include <uhd/transport/vrt_if_packet.hpp>  #include <uhd/transport/chdr.hpp> +#include <uhd/transport/vrt_if_packet.hpp>  #include <uhd/transport/zero_copy.hpp> -#include <uhd/types/sid.hpp> -#include <uhd/types/metadata.hpp> -#include <uhd/types/endianness.hpp>  #include <uhd/types/direction.hpp> +#include <uhd/types/endianness.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/types/sid.hpp>  #include <uhd/utils/tasks.hpp> -#include <uhd/device3.hpp> -#include "../../transport/super_send_packet_handler.hpp" -#include "../../transport/super_recv_packet_handler.hpp" -#include <uhdlib/rfnoc/tx_stream_terminator.hpp>  #include <uhdlib/rfnoc/rx_stream_terminator.hpp> +#include <uhdlib/rfnoc/tx_stream_terminator.hpp>  #include <uhdlib/rfnoc/xports.hpp>  namespace uhd { namespace usrp { @@ -32,31 +32,33 @@ namespace uhd { namespace usrp {  /***********************************************************************   * Default settings (any device3 may override these)   **********************************************************************/ -static const size_t DEVICE3_RX_FC_REQUEST_FREQ         = 32;    //per flow-control window -static const size_t DEVICE3_TX_FC_RESPONSE_FREQ        = 8; -static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32   = 2; -static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET     = 0; -static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET       = 1; -static const size_t DEVICE3_LINE_SIZE                  = 8; - -static const size_t DEVICE3_TX_MAX_HDR_LEN             = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t);    // Bytes -static const size_t DEVICE3_RX_MAX_HDR_LEN             = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t);    // Bytes - -// This class manages the lifetime of the TX async message handler task, transports, and terminator +static const size_t DEVICE3_RX_FC_REQUEST_FREQ       = 32; // per flow-control window +static const size_t DEVICE3_TX_FC_RESPONSE_FREQ      = 8; +static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32 = 2; +static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET   = 0; +static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET     = 1; +static const size_t DEVICE3_LINE_SIZE                = 8; + +static const size_t DEVICE3_TX_MAX_HDR_LEN = +    uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes +static const size_t DEVICE3_RX_MAX_HDR_LEN = +    uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes + +// This class manages the lifetime of the TX async message handler task, transports, and +// terminator  class device3_send_packet_streamer : public uhd::transport::sph::send_packet_streamer  {  public: -    device3_send_packet_streamer( -            const size_t max_num_samps, -            const uhd::rfnoc::tx_stream_terminator::sptr terminator, -            const both_xports_t data_xport, -            const both_xports_t async_msg_xport -    ) : -        uhd::transport::sph::send_packet_streamer(max_num_samps), -        _terminator(terminator), -        _data_xport(data_xport), -        _async_msg_xport(async_msg_xport) -    {} +    device3_send_packet_streamer(const size_t max_num_samps, +        const uhd::rfnoc::tx_stream_terminator::sptr terminator, +        const both_xports_t data_xport, +        const both_xports_t async_msg_xport) +        : uhd::transport::sph::send_packet_streamer(max_num_samps) +        , _terminator(terminator) +        , _data_xport(data_xport) +        , _async_msg_xport(async_msg_xport) +    { +    }      ~device3_send_packet_streamer()      { @@ -81,18 +83,19 @@ private:      std::vector<task::sptr> _tx_async_msg_tasks;  }; -// This class manages the lifetime of the RX transports and terminator and provides access to both +// This class manages the lifetime of the RX transports and terminator and provides access +// to both  class device3_recv_packet_streamer : public uhd::transport::sph::recv_packet_streamer  {  public: -    device3_recv_packet_streamer( -            const size_t max_num_samps, -            const uhd::rfnoc::rx_stream_terminator::sptr terminator, -            const both_xports_t xport -        ) : -            uhd::transport::sph::recv_packet_streamer(max_num_samps), -            _terminator(terminator), -            _xport(xport) {} +    device3_recv_packet_streamer(const size_t max_num_samps, +        const uhd::rfnoc::rx_stream_terminator::sptr terminator, +        const both_xports_t xport) +        : uhd::transport::sph::recv_packet_streamer(max_num_samps) +        , _terminator(terminator) +        , _xport(xport) +    { +    }      ~device3_recv_packet_streamer() {} @@ -111,7 +114,8 @@ private:      both_xports_t _xport;  }; -class device3_impl : public uhd::device3, public boost::enable_shared_from_this<device3_impl> +class device3_impl : public uhd::device3, +                     public boost::enable_shared_from_this<device3_impl>  {  public:      /*********************************************************************** @@ -120,14 +124,9 @@ public:      typedef uhd::transport::bounded_buffer<uhd::async_metadata_t> async_md_type;      //! The purpose of a transport -    enum xport_type_t { -        CTRL = 0, -        ASYNC_MSG, -        TX_DATA, -        RX_DATA -    }; +    enum xport_type_t { CTRL = 0, ASYNC_MSG, TX_DATA, RX_DATA }; -    enum xport_t {AXI, ETH, PCIE}; +    enum xport_t { AXI, ETH, PCIE };      //! Stores all streaming-related options      struct stream_options_t @@ -145,20 +144,21 @@ public:              , rx_max_len_hdr(DEVICE3_RX_MAX_HDR_LEN)              , rx_fc_request_freq(DEVICE3_RX_FC_REQUEST_FREQ)              , tx_fc_response_freq(DEVICE3_TX_FC_RESPONSE_FREQ) -        {} +        { +        }      };      /***********************************************************************       * I/O Interface       **********************************************************************/ -    uhd::tx_streamer::sptr get_tx_stream(const uhd::stream_args_t &); -    uhd::rx_streamer::sptr get_rx_stream(const uhd::stream_args_t &); -    bool recv_async_msg(uhd::async_metadata_t &async_metadata, double timeout); +    uhd::tx_streamer::sptr get_tx_stream(const uhd::stream_args_t&); +    uhd::rx_streamer::sptr get_rx_stream(const uhd::stream_args_t&); +    bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout);      /***********************************************************************       * Other public APIs       **********************************************************************/ -    rfnoc::graph::sptr create_graph(const std::string &name=""); +    rfnoc::graph::sptr create_graph(const std::string& name = "");  protected:      /*********************************************************************** @@ -172,10 +172,10 @@ protected:       **********************************************************************/      // The 'rate' argument is so we can use these as subscribers to rate changes  public: // TODO make these protected again -    void update_rx_streamers(double rate=-1.0); -    void update_tx_streamers(double rate=-1.0); -protected: +    void update_rx_streamers(double rate = -1.0); +    void update_tx_streamers(double rate = -1.0); +protected:      /***********************************************************************       * Transport-related       **********************************************************************/ @@ -187,17 +187,21 @@ protected:       *                The source address in this value is not considered, only the       *                destination address.       * \param xport_type Specify which kind of transport this is. -     * \param args Additional arguments for the transport generation. See \ref page_transport -     *             for valid arguments. +     * \param args Additional arguments for the transport generation. See \ref +     * page_transport for valid arguments.       */ -    virtual uhd::both_xports_t make_transport( -        const uhd::sid_t &address, +    virtual uhd::both_xports_t make_transport(const uhd::sid_t& address,          const xport_type_t xport_type, -        const uhd::device_addr_t& args -    ) = 0; +        const uhd::device_addr_t& args) = 0; -    virtual uhd::device_addr_t get_tx_hints(size_t) { return uhd::device_addr_t(); } -    virtual uhd::device_addr_t get_rx_hints(size_t) { return uhd::device_addr_t(); } +    virtual uhd::device_addr_t get_tx_hints(size_t) +    { +        return uhd::device_addr_t(); +    } +    virtual uhd::device_addr_t get_rx_hints(size_t) +    { +        return uhd::device_addr_t(); +    }      //! Is called after a streamer is generated      virtual void post_streamer_hooks(uhd::direction_t) {} @@ -216,29 +220,25 @@ protected:       * \param chan_args New channel args. Must have same length as chan_ids.       *       */ -    void merge_channel_defs( -            const std::vector<rfnoc::block_id_t> &chan_ids, -            const std::vector<uhd::device_addr_t> &chan_args, -            const uhd::direction_t dir -    ); +    void merge_channel_defs(const std::vector<rfnoc::block_id_t>& chan_ids, +        const std::vector<uhd::device_addr_t>& chan_args, +        const uhd::direction_t dir);      /***********************************************************************       * RFNoC-Specific       **********************************************************************/ -    void enumerate_rfnoc_blocks( -            size_t device_index, -            size_t n_blocks, -            size_t base_port, -            const uhd::sid_t &base_sid, -            uhd::device_addr_t transport_args -    ); +    void enumerate_rfnoc_blocks(size_t device_index, +        size_t n_blocks, +        size_t base_port, +        const uhd::sid_t& base_sid, +        uhd::device_addr_t transport_args);      /***********************************************************************       * Members       **********************************************************************/      // TODO: Maybe move these to private -    uhd::dict<std::string, boost::weak_ptr<uhd::rx_streamer> > _rx_streamers; -    uhd::dict<std::string, boost::weak_ptr<uhd::tx_streamer> > _tx_streamers; +    uhd::dict<std::string, boost::weak_ptr<uhd::rx_streamer>> _rx_streamers; +    uhd::dict<std::string, boost::weak_ptr<uhd::tx_streamer>> _tx_streamers;  private:      /*********************************************************************** diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index 7afa2ace0..c0f91368d 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -7,16 +7,16 @@  // Provides streaming-related functions which are used by device3 objects. -#include "device3_impl.hpp"  #include "device3_flow_ctrl.hpp" +#include "device3_impl.hpp"  #include <uhd/rfnoc/constants.hpp> -#include <uhd/rfnoc/source_block_ctrl_base.hpp> +#include <uhd/rfnoc/radio_ctrl.hpp> +#include <uhd/rfnoc/rate_node_ctrl.hpp>  #include <uhd/rfnoc/sink_block_ctrl_base.hpp> +#include <uhd/rfnoc/source_block_ctrl_base.hpp> +#include <uhd/transport/zero_copy_flow_ctrl.hpp>  #include <uhd/utils/byteswap.hpp>  #include <uhd/utils/log.hpp> -#include <uhd/rfnoc/rate_node_ctrl.hpp> -#include <uhd/rfnoc/radio_ctrl.hpp> -#include <uhd/transport/zero_copy_flow_ctrl.hpp>  #include <uhdlib/rfnoc/rx_stream_terminator.hpp>  #include <uhdlib/rfnoc/tx_stream_terminator.hpp>  #include <uhdlib/usrp/common/async_packet_handler.hpp> @@ -32,7 +32,7 @@ using namespace uhd::transport;  /***********************************************************************   * Helper functions for get_?x_stream()   **********************************************************************/ -static uhd::stream_args_t sanitize_stream_args(const uhd::stream_args_t &args_) +static uhd::stream_args_t sanitize_stream_args(const uhd::stream_args_t& args_)  {      uhd::stream_args_t args = args_;      if (args.channels.empty()) { @@ -42,31 +42,33 @@ static uhd::stream_args_t sanitize_stream_args(const uhd::stream_args_t &args_)      return args;  } -static void check_stream_sig_compatible(const rfnoc::stream_sig_t &stream_sig, stream_args_t &args, const std::string &tx_rx) +static void check_stream_sig_compatible( +    const rfnoc::stream_sig_t& stream_sig, stream_args_t& args, const std::string& tx_rx)  {      if (args.otw_format.empty()) {          if (stream_sig.item_type.empty()) { -            throw uhd::runtime_error(str( -                    boost::format("[%s Streamer] No otw_format defined!") % tx_rx -            )); +            throw uhd::runtime_error( +                str(boost::format("[%s Streamer] No otw_format defined!") % tx_rx));          } else {              args.otw_format = stream_sig.item_type;          } -    } else if (not stream_sig.item_type.empty() and stream_sig.item_type != args.otw_format) { -        throw uhd::runtime_error(str( -                boost::format("[%s Streamer] Conflicting OTW types defined: args.otw_format = '%s' <=> stream_sig.item_type = '%s'") -                % tx_rx % args.otw_format % stream_sig.item_type -        )); +    } else if (not stream_sig.item_type.empty() +               and stream_sig.item_type != args.otw_format) { +        throw uhd::runtime_error( +            str(boost::format("[%s Streamer] Conflicting OTW types defined: " +                              "args.otw_format = '%s' <=> stream_sig.item_type = '%s'") +                % tx_rx % args.otw_format % stream_sig.item_type));      }      const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item      if (stream_sig.packet_size) {          if (args.args.has_key("spp")) {              size_t args_spp = args.args.cast<size_t>("spp", 0);              if (args_spp * bpi != stream_sig.packet_size) { -                throw uhd::runtime_error(str( -                        boost::format("[%s Streamer] Conflicting packet sizes defined: args yields %d bytes but stream_sig.packet_size is %d bytes") -                        % tx_rx % (args_spp * bpi) % stream_sig.packet_size -                )); +                throw uhd::runtime_error( +                    str(boost::format( +                            "[%s Streamer] Conflicting packet sizes defined: args yields " +                            "%d bytes but stream_sig.packet_size is %d bytes") +                        % tx_rx % (args_spp * bpi) % stream_sig.packet_size));              }          } else {              args.args["spp"] = str(boost::format("%d") % (stream_sig.packet_size / bpi)); @@ -82,19 +84,18 @@ static void check_stream_sig_compatible(const rfnoc::stream_sig_t &stream_sig, s   *   * \param args_ Stream args.   * \param[out] chan_list The list of channels in the correct order. - * \param[out] chan_args Channel args for every channel. `chan_args.size() == chan_list.size()` + * \param[out] chan_args Channel args for every channel. `chan_args.size() == + * chan_list.size()`   */ -void generate_channel_list( -        const uhd::stream_args_t &args_, -        std::vector<uhd::rfnoc::block_id_t> &chan_list, -        std::vector<device_addr_t> &chan_args -) { +void generate_channel_list(const uhd::stream_args_t& args_, +    std::vector<uhd::rfnoc::block_id_t>& chan_list, +    std::vector<device_addr_t>& chan_args) +{      uhd::stream_args_t args = args_;      std::vector<uhd::rfnoc::block_id_t> chan_list_(args.channels.size());      std::vector<device_addr_t> chan_args_(args.channels.size()); -    for (size_t i = 0; i < args.channels.size(); i++) -    { +    for (size_t i = 0; i < args.channels.size(); i++) {          // Extract block ID          size_t chan_idx = args.channels[i];          std::string key = str(boost::format("block_id%d") % chan_idx); @@ -103,10 +104,10 @@ void generate_channel_list(          } else if (args.args.has_key("block_id")) {              chan_list_[i] = args.args["block_id"];          } else { -            throw uhd::runtime_error(str( -                boost::format("Cannot create streamers: No block_id specified for channel %d.") -                % chan_idx -            )); +            throw uhd::runtime_error( +                str(boost::format( +                        "Cannot create streamers: No block_id specified for channel %d.") +                    % chan_idx));          }          // Split off known channel specific args @@ -125,7 +126,7 @@ void generate_channel_list(      }      // Add all remaining args to all channel args -    for(device_addr_t &chan_arg:  chan_args_) { +    for (device_addr_t& chan_arg : chan_args_) {          chan_arg = chan_arg.to_string() + "," + args.args.to_string();      } @@ -154,25 +155,20 @@ void generate_channel_list(   *  \returns The size of the flow control window in number of packets   */  static size_t get_rx_flow_control_window( -        size_t pkt_size, -        size_t sw_buff_size, -        const device_addr_t& rx_args -) { +    size_t pkt_size, size_t sw_buff_size, const device_addr_t& rx_args) +{      double fullness_factor = rx_args.cast<double>( -            "recv_buff_fullness", -            uhd::rfnoc::DEFAULT_FC_RX_SW_BUFF_FULL_FACTOR -    ); +        "recv_buff_fullness", uhd::rfnoc::DEFAULT_FC_RX_SW_BUFF_FULL_FACTOR);      if (fullness_factor < 0.01 || fullness_factor > 1) { -        throw uhd::value_error("recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)"); +        throw uhd::value_error( +            "recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)");      }      size_t window_in_bytes = (static_cast<size_t>(sw_buff_size * fullness_factor));      if (rx_args.has_key("max_recv_window")) {          window_in_bytes = std::min( -            window_in_bytes, -            rx_args.cast<size_t>("max_recv_window", window_in_bytes) -        ); +            window_in_bytes, rx_args.cast<size_t>("max_recv_window", window_in_bytes));      }      if (window_in_bytes < pkt_size) {          throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size."); @@ -200,32 +196,28 @@ struct async_tx_info_t   *   * This is run inside a uhd::task as long as this streamer lives.   */ -static void handle_tx_async_msgs( -        boost::shared_ptr<async_tx_info_t> async_info, -        zero_copy_if::sptr xport, -        uint32_t (*to_host)(uint32_t), -        void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), -        boost::function<double(void)> get_tick_rate -) { +static void handle_tx_async_msgs(boost::shared_ptr<async_tx_info_t> async_info, +    zero_copy_if::sptr xport, +    uint32_t (*to_host)(uint32_t), +    void (*unpack)(const uint32_t* packet_buff, vrt::if_packet_info_t&), +    boost::function<double(void)> get_tick_rate) +{      managed_recv_buffer::sptr buff = xport->get_recv_buff(); -    if (not buff) -    { +    if (not buff) {          return;      } -    //extract packet info +    // extract packet info      vrt::if_packet_info_t if_packet_info; -    if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); -    const uint32_t *packet_buff = buff->cast<const uint32_t *>(); +    if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); +    const uint32_t* packet_buff       = buff->cast<const uint32_t*>(); -    //unpacking can fail -    try -    { +    // unpacking can fail +    try {          unpack(packet_buff, if_packet_info); -    } -    catch(const std::exception &ex) -    { -        UHD_LOGGER_ERROR("STREAMER") << "Error parsing async message packet: " << ex.what() ; +    } catch (const std::exception& ex) { +        UHD_LOGGER_ERROR("STREAMER") +            << "Error parsing async message packet: " << ex.what();          return;      } @@ -234,21 +226,20 @@ static void handle_tx_async_msgs(          tick_rate = 1;      } -    //fill in the async metadata +    // fill in the async metadata      async_metadata_t metadata; -    load_metadata_from_buff( -            to_host, -            metadata, -            if_packet_info, -            packet_buff, -            tick_rate, -            async_info->stream_channel -    ); +    load_metadata_from_buff(to_host, +        metadata, +        if_packet_info, +        packet_buff, +        tick_rate, +        async_info->stream_channel);      // Filter out any flow control messages and cache the rest -    if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) -    { -        UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl; +    if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { +        UHD_LOGGER_ERROR("TX ASYNC MSG") +            << "Unexpected flow control message found in async message handling" +            << std::endl;      } else {          async_info->async_queue->push_with_pop_on_full(metadata);          metadata.channel = async_info->device_channel; @@ -257,9 +248,7 @@ static void handle_tx_async_msgs(      }  } -bool device3_impl::recv_async_msg( -    async_metadata_t &async_metadata, double timeout -) +bool device3_impl::recv_async_msg(async_metadata_t& async_metadata, double timeout)  {      return _async_md->pop_with_timed_wait(async_metadata, timeout);  } @@ -269,10 +258,11 @@ bool device3_impl::recv_async_msg(   **********************************************************************/  void device3_impl::update_rx_streamers(double /* rate */)  { -    for(const std::string &block_id:  _rx_streamers.keys()) { +    for (const std::string& block_id : _rx_streamers.keys()) {          UHD_RX_STREAMER_LOG() << "updating RX streamer to " << block_id;          boost::shared_ptr<device3_recv_packet_streamer> my_streamer = -            boost::dynamic_pointer_cast<device3_recv_packet_streamer>(_rx_streamers[block_id].lock()); +            boost::dynamic_pointer_cast<device3_recv_packet_streamer>( +                _rx_streamers[block_id].lock());          if (my_streamer) {              double tick_rate = my_streamer->get_terminator()->get_tick_rate();              if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -285,9 +275,11 @@ void device3_impl::update_rx_streamers(double /* rate */)              }              double scaling = my_streamer->get_terminator()->get_output_scale_factor();              if (scaling == rfnoc::scalar_node_ctrl::SCALE_UNDEFINED) { -                scaling = 1/32767.; +                scaling = 1 / 32767.;              } -            UHD_RX_STREAMER_LOG() << "  New tick_rate == " << tick_rate << "  New samp_rate == " << samp_rate << " New scaling == " << scaling ; +            UHD_RX_STREAMER_LOG() +                << "  New tick_rate == " << tick_rate +                << "  New samp_rate == " << samp_rate << " New scaling == " << scaling;              my_streamer->set_tick_rate(tick_rate);              my_streamer->set_samp_rate(samp_rate); @@ -296,7 +288,7 @@ void device3_impl::update_rx_streamers(double /* rate */)      }  } -rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) +rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t& args_)  {      boost::mutex::scoped_lock lock(_transport_setup_mutex);      stream_args_t args = sanitize_stream_args(args_); @@ -312,88 +304,83 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)      // The terminator's lifetime is coupled to the streamer.      // There is only one terminator. If the streamer has multiple channels,      // it will be connected to each upstream block. -    rfnoc::rx_stream_terminator::sptr recv_terminator = rfnoc::rx_stream_terminator::make(); -    for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) -    { +    rfnoc::rx_stream_terminator::sptr recv_terminator = +        rfnoc::rx_stream_terminator::make(); +    for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) {          // First, configure blocks and create transport          // Get block ID and mb index          uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; -        UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id ; +        UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id;          // Update args so args.args is always valid for this particular channel: -        args.args = chan_args[stream_i]; +        args.args       = chan_args[stream_i];          size_t mb_index = block_id.get_device_no(); -        size_t suggested_block_port = args.args.cast<size_t>("block_port", rfnoc::ANY_PORT); +        size_t suggested_block_port = +            args.args.cast<size_t>("block_port", rfnoc::ANY_PORT);          // Access to this channel's block control          uhd::rfnoc::source_block_ctrl_base::sptr blk_ctrl = -            boost::dynamic_pointer_cast<uhd::rfnoc::source_block_ctrl_base>(get_block_ctrl(block_id)); +            boost::dynamic_pointer_cast<uhd::rfnoc::source_block_ctrl_base>( +                get_block_ctrl(block_id));          // Connect the terminator with this channel's block.          size_t block_port = blk_ctrl->connect_downstream( -                recv_terminator, -                suggested_block_port, -                args.args -        ); +            recv_terminator, suggested_block_port, args.args);          const size_t terminator_port = recv_terminator->connect_upstream(blk_ctrl);          blk_ctrl->set_downstream_port(block_port, terminator_port);          recv_terminator->set_upstream_port(terminator_port, block_port);          // Check if the block connection is compatible (spp and item type) -        check_stream_sig_compatible(blk_ctrl->get_output_signature(block_port), args, "RX"); +        check_stream_sig_compatible( +            blk_ctrl->get_output_signature(block_port), args, "RX");          // Setup the DSP transport hints          device_addr_t rx_hints = get_rx_hints(mb_index); -        //allocate sid and create transport +        // allocate sid and create transport          uhd::sid_t stream_address = blk_ctrl->get_address(block_port); -        UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string() ; +        UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string();          both_xports_t xport = make_transport(stream_address, RX_DATA, rx_hints); -        UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size; +        UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec +                              << " actual recv_buff_size = " << xport.recv_buff_size;          // Configure the block          // Flow control setup          const size_t pkt_size = xport.recv->get_recv_frame_size();          // Leave one pkt_size space for overrun packets - TODO make this obsolete -        const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) - pkt_size; -        const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq); -        UHD_RX_STREAMER_LOG()<< "Flow Control Window = " << (fc_window) << ", Flow Control Handler Window = " << fc_handle_window; -        blk_ctrl->configure_flow_control_out( -            true, +        const size_t fc_window = +            get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) +            - pkt_size; +        const size_t fc_handle_window = +            std::max<size_t>(1, fc_window / stream_options.rx_fc_request_freq); +        UHD_RX_STREAMER_LOG() << "Flow Control Window = " << (fc_window) +                              << ", Flow Control Handler Window = " << fc_handle_window; +        blk_ctrl->configure_flow_control_out(true,              fc_window, -            rx_hints.cast<size_t>("recv_pkt_limit", 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 -            block_port -        ); +            rx_hints.cast<size_t>("recv_pkt_limit", +                0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 +            block_port);          // Add flow control transport          boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); -        fc_cache->sid = xport.send_sid; -        fc_cache->xport = xport.send; +        fc_cache->sid      = xport.send_sid; +        fc_cache->xport    = xport.send;          fc_cache->interval = fc_handle_window; -        if (xport.endianness == ENDIANNESS_BIG) -        { -            fc_cache->to_host = uhd::ntohx<uint32_t>; +        if (xport.endianness == ENDIANNESS_BIG) { +            fc_cache->to_host   = uhd::ntohx<uint32_t>;              fc_cache->from_host = uhd::htonx<uint32_t>; -            fc_cache->pack = vrt::chdr::if_hdr_pack_be; -            fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; -        } -        else -        { -            fc_cache->to_host = uhd::wtohx<uint32_t>; +            fc_cache->pack      = vrt::chdr::if_hdr_pack_be; +            fc_cache->unpack    = vrt::chdr::if_hdr_unpack_be; +        } else { +            fc_cache->to_host   = uhd::wtohx<uint32_t>;              fc_cache->from_host = uhd::htowx<uint32_t>; -            fc_cache->pack = vrt::chdr::if_hdr_pack_le; -            fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; +            fc_cache->pack      = vrt::chdr::if_hdr_pack_le; +            fc_cache->unpack    = vrt::chdr::if_hdr_unpack_le;          } -        xport.recv = zero_copy_flow_ctrl::make -        ( -            xport.recv, -            NULL, -            [fc_cache](managed_buffer::sptr buff) { -                return rx_flow_ctrl( -                    fc_cache, -                    buff); -            } -        ); +        xport.recv = zero_copy_flow_ctrl::make( +            xport.recv, NULL, [fc_cache](managed_buffer::sptr buff) { +                return rx_flow_ctrl(fc_cache, buff); +            });          // Configure the block          // Note: We need to set_destination() after writing to SR_CLEAR_TX_FC. @@ -406,36 +393,41 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)          blk_ctrl->set_destination(xport.send_sid.get_src(), block_port);          // Configure routing for responses -        blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); -        UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src() ; +        blk_ctrl->sr_write( +            uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); +        UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src();          // Find all upstream radio nodes and set their response in SID to the host -        std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > upstream_radio_nodes = blk_ctrl->find_upstream_node<uhd::rfnoc::radio_ctrl>(); -        UHD_RX_STREAMER_LOG() << "Number of upstream radio nodes: " << upstream_radio_nodes.size(); -        for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node:  upstream_radio_nodes) { -            node->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); +        std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl>> upstream_radio_nodes = +            blk_ctrl->find_upstream_node<uhd::rfnoc::radio_ctrl>(); +        UHD_RX_STREAMER_LOG() << "Number of upstream radio nodes: " +                              << upstream_radio_nodes.size(); +        for (const boost::shared_ptr<uhd::rfnoc::radio_ctrl>& node : +            upstream_radio_nodes) { +            node->sr_write( +                uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port);          }          // Second, configure the streamer -        //make the new streamer given the samples per packet -        if (not my_streamer) -        { -            // To calculate the max number of samples per packet, we assume the maximum header length -            // to avoid fragmentation should the entire header be used. -            const size_t bpp = pkt_size - stream_options.rx_max_len_hdr; // bytes per packet -            const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item -            const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet -            UHD_RX_STREAMER_LOG() << "spp == " << spp ; +        // make the new streamer given the samples per packet +        if (not my_streamer) { +            // To calculate the max number of samples per packet, we assume the maximum +            // header length to avoid fragmentation should the entire header be used. +            const size_t bpp = +                pkt_size - stream_options.rx_max_len_hdr; // bytes per packet +            const size_t bpi = +                convert::get_bytes_per_item(args.otw_format); // bytes per item +            const size_t spp = std::min(args.args.cast<size_t>("spp", bpp / bpi), +                bpp / bpi); // samples per packet +            UHD_RX_STREAMER_LOG() << "spp == " << spp;              my_streamer = boost::make_shared<device3_recv_packet_streamer>( -                    spp, -                    recv_terminator, -                    xport); +                spp, recv_terminator, xport);              my_streamer->resize(chan_list.size());          } -        //init some streamer stuff +        // init some streamer stuff          std::string conv_endianness;          if (xport.endianness == ENDIANNESS_BIG) {              my_streamer->set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_be); @@ -445,63 +437,51 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)              conv_endianness = "le";          } -        //set the converter +        // set the converter          uhd::convert::id_type id; -        id.input_format = args.otw_format + "_item32_" + conv_endianness; -        id.num_inputs = 1; +        id.input_format  = args.otw_format + "_item32_" + conv_endianness; +        id.num_inputs    = 1;          id.output_format = args.cpu_format; -        id.num_outputs = 1; +        id.num_outputs   = 1;          my_streamer->set_converter(id);          // Give the streamer a functor to handle flow control ACK messages          my_streamer->set_xport_handle_flowctrl_ack( -            stream_i, -            [fc_cache](const uint32_t *payload) { -                handle_rx_flowctrl_ack( -                        fc_cache, -                        payload -                ); -            } -        ); +            stream_i, [fc_cache](const uint32_t* payload) { +                handle_rx_flowctrl_ack(fc_cache, payload); +            }); -        //Give the streamer a functor to get the recv_buffer -        my_streamer->set_xport_chan_get_buff( -            stream_i, -            [xport](double timeout) { -                return xport.recv->get_recv_buff(timeout); -            }, +        // Give the streamer a functor to get the recv_buffer +        my_streamer->set_xport_chan_get_buff(stream_i, +            [xport](double timeout) { return xport.recv->get_recv_buff(timeout); },              true /*flush*/          ); -        //Give the streamer a functor to handle overruns -        //bind requires a weak_ptr to break the a streamer->streamer circular dependency -        //Using "this" is OK because we know that this device3_impl will outlive the streamer +        // Give the streamer a functor to handle overruns +        // bind requires a weak_ptr to break the a streamer->streamer circular dependency +        // Using "this" is OK because we know that this device3_impl will outlive the +        // streamer          boost::weak_ptr<uhd::rx_streamer> weak_ptr(my_streamer);          my_streamer->set_overflow_handler( -            stream_i, -            [recv_terminator, weak_ptr, stream_i]() { -                recv_terminator->handle_overrun( -                        weak_ptr, -                        stream_i); -            } -        ); +            stream_i, [recv_terminator, weak_ptr, stream_i]() { +                recv_terminator->handle_overrun(weak_ptr, stream_i); +            }); -        //Give the streamer a functor issue stream cmd +        // Give the streamer a functor issue stream cmd          my_streamer->set_issue_stream_cmd( -            stream_i, -            [blk_ctrl, block_port](const stream_cmd_t& stream_cmd) { +            stream_i, [blk_ctrl, block_port](const stream_cmd_t& stream_cmd) {                  blk_ctrl->issue_stream_cmd(stream_cmd, block_port); -            } -        ); +            });      }      // Notify all blocks in this chain that they are connected to an active streamer      recv_terminator->set_rx_streamer(true, 0); -    // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. -    // Note that we store the streamer only once, and use its terminator's -    // ID to do so. -    _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr<uhd::rx_streamer>(my_streamer); +    // Store a weak pointer to prevent a streamer->device3_impl->streamer circular +    // dependency. Note that we store the streamer only once, and use its terminator's ID +    // to do so. +    _rx_streamers[recv_terminator->unique_id()] = +        boost::weak_ptr<uhd::rx_streamer>(my_streamer);      // Sets tick rate, samp rate and scaling on this streamer.      // A registered terminator is required to do this. @@ -516,10 +496,11 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_)   **********************************************************************/  void device3_impl::update_tx_streamers(double /* rate */)  { -    for(const std::string &block_id:  _tx_streamers.keys()) { +    for (const std::string& block_id : _tx_streamers.keys()) {          UHD_TX_STREAMER_LOG() << "updating TX streamer: " << block_id;          boost::shared_ptr<device3_send_packet_streamer> my_streamer = -            boost::dynamic_pointer_cast<device3_send_packet_streamer>(_tx_streamers[block_id].lock()); +            boost::dynamic_pointer_cast<device3_send_packet_streamer>( +                _tx_streamers[block_id].lock());          if (my_streamer) {              double tick_rate = my_streamer->get_terminator()->get_tick_rate();              if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -533,7 +514,9 @@ void device3_impl::update_tx_streamers(double /* rate */)              if (scaling == rfnoc::scalar_node_ctrl::SCALE_UNDEFINED) {                  scaling = 32767.;              } -            UHD_TX_STREAMER_LOG() << "New tick_rate == " << tick_rate << "  New samp_rate == " << samp_rate << " New scaling == " << scaling ; +            UHD_TX_STREAMER_LOG() +                << "New tick_rate == " << tick_rate << "  New samp_rate == " << samp_rate +                << " New scaling == " << scaling;              my_streamer->set_tick_rate(tick_rate);              my_streamer->set_samp_rate(samp_rate);              my_streamer->set_scale_factor(scaling); @@ -541,7 +524,7 @@ void device3_impl::update_tx_streamers(double /* rate */)      }  } -tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) +tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t& args_)  {      boost::mutex::scoped_lock lock(_transport_setup_mutex);      stream_args_t args = sanitize_stream_args(args_); @@ -552,108 +535,111 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)      generate_channel_list(args, chan_list, chan_args);      // Note: All 'args.args' are merged into chan_args now. -    //shared async queue for all channels in streamer -    boost::shared_ptr<async_md_type> async_md(new async_md_type(1000/*messages deep*/)); +    // shared async queue for all channels in streamer +    boost::shared_ptr<async_md_type> async_md(new async_md_type(1000 /*messages deep*/));      // II. Iterate over all channels      boost::shared_ptr<device3_send_packet_streamer> my_streamer;      // The terminator's lifetime is coupled to the streamer.      // There is only one terminator. If the streamer has multiple channels,      // it will be connected to each downstream block. -    rfnoc::tx_stream_terminator::sptr send_terminator = rfnoc::tx_stream_terminator::make(); -    for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) -    { +    rfnoc::tx_stream_terminator::sptr send_terminator = +        rfnoc::tx_stream_terminator::make(); +    for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) {          // First, configure the downstream blocks and create the transports          // Get block ID and mb index          uhd::rfnoc::block_id_t block_id = chan_list[stream_i];          // Update args so args.args is always valid for this particular channel: -        args.args = chan_args[stream_i]; +        args.args       = chan_args[stream_i];          size_t mb_index = block_id.get_device_no(); -        size_t suggested_block_port = args.args.cast<size_t>("block_port", rfnoc::ANY_PORT); +        size_t suggested_block_port = +            args.args.cast<size_t>("block_port", rfnoc::ANY_PORT);          // Access to this channel's block control          uhd::rfnoc::sink_block_ctrl_base::sptr blk_ctrl = -            boost::dynamic_pointer_cast<uhd::rfnoc::sink_block_ctrl_base>(get_block_ctrl(block_id)); +            boost::dynamic_pointer_cast<uhd::rfnoc::sink_block_ctrl_base>( +                get_block_ctrl(block_id));          // Connect the terminator with this channel's block.          // This will throw if the connection is not possible. -        size_t block_port = blk_ctrl->connect_upstream( -                send_terminator, -                suggested_block_port, -                args.args -        ); +        size_t block_port = +            blk_ctrl->connect_upstream(send_terminator, suggested_block_port, args.args);          const size_t terminator_port = send_terminator->connect_downstream(blk_ctrl);          blk_ctrl->set_upstream_port(block_port, terminator_port);          send_terminator->set_downstream_port(terminator_port, block_port);          // Check if the block connection is compatible (spp and item type) -        check_stream_sig_compatible(blk_ctrl->get_input_signature(block_port), args, "TX"); +        check_stream_sig_compatible( +            blk_ctrl->get_input_signature(block_port), args, "TX");          // Setup the dsp transport hints          device_addr_t tx_hints = get_tx_hints(mb_index);          const size_t fifo_size = blk_ctrl->get_fifo_size(block_port);          // Allocate sid and create transport          uhd::sid_t stream_address = blk_ctrl->get_address(block_port); -        UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ; +        UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string();          both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints); -        both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t("")); -        UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ; +        both_xports_t async_xport = +            make_transport(stream_address, ASYNC_MSG, device_addr_t("")); +        UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec;          // Configure flow control          // This disables the FC module's output, do this before configuring flow control          blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port);          blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port);          // Configure flow control on downstream block -        const size_t fc_window = std::min(tx_hints.cast<size_t>("send_buff_size", fifo_size), fifo_size); -        const size_t fc_handle_window = std::max<size_t>(1, fc_window / stream_options.tx_fc_response_freq); -        UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window << ", Flow Control Handler Window = " << fc_handle_window ; -        blk_ctrl->configure_flow_control_in( -                fc_handle_window, /*bytes*/ -                block_port -        ); +        const size_t fc_window = +            std::min(tx_hints.cast<size_t>("send_buff_size", fifo_size), fifo_size); +        const size_t fc_handle_window = +            std::max<size_t>(1, fc_window / stream_options.tx_fc_response_freq); +        UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window +                              << ", Flow Control Handler Window = " << fc_handle_window; +        blk_ctrl->configure_flow_control_in(fc_handle_window, /*bytes*/ +            block_port);          // Add flow control transport          boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); -        if (xport.endianness == ENDIANNESS_BIG) -        { -            fc_cache->to_host = uhd::ntohx<uint32_t>; +        if (xport.endianness == ENDIANNESS_BIG) { +            fc_cache->to_host   = uhd::ntohx<uint32_t>;              fc_cache->from_host = uhd::htonx<uint32_t>; -            fc_cache->pack = vrt::chdr::if_hdr_pack_be; -            fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; +            fc_cache->pack      = vrt::chdr::if_hdr_pack_be; +            fc_cache->unpack    = vrt::chdr::if_hdr_unpack_be;          } else { -            fc_cache->to_host = uhd::wtohx<uint32_t>; +            fc_cache->to_host   = uhd::wtohx<uint32_t>;              fc_cache->from_host = uhd::htowx<uint32_t>; -            fc_cache->pack = vrt::chdr::if_hdr_pack_le; -            fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; +            fc_cache->pack      = vrt::chdr::if_hdr_pack_le; +            fc_cache->unpack    = vrt::chdr::if_hdr_unpack_le;          } -        xport.send = zero_copy_flow_ctrl::make( -            xport.send, +        xport.send = zero_copy_flow_ctrl::make(xport.send,              [fc_cache, xport](managed_buffer::sptr buff) { -                return tx_flow_ctrl( -                    fc_cache, -                    xport.recv, -                    buff); +                return tx_flow_ctrl(fc_cache, xport.recv, buff);              }, -            NULL -        ); +            NULL);          // Configure return path for async messages -        blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); -        UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ; +        blk_ctrl->sr_write( +            uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); +        UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " +                              << boost::format("0x%04X") % xport.recv_sid.get_dst();          // FIXME: Once there is a better way to map the radio block and port          // to the channel or another way to receive asynchronous messages that          // is not in-band, this should be removed. -        if (args.args.has_key("radio_id") and args.args.has_key("radio_port")) -        { +        if (args.args.has_key("radio_id") and args.args.has_key("radio_port")) {              // Find downstream radio node and set the response SID to the host              uhd::rfnoc::block_id_t radio_id(args.args["radio_id"]);              size_t radio_port = args.args.cast<size_t>("radio_port", 0); -            std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > downstream_radio_nodes = blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); -            UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); -            for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node:  downstream_radio_nodes) { +            std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl>> +                downstream_radio_nodes = +                    blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); +            UHD_TX_STREAMER_LOG() +                << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); +            for (const boost::shared_ptr<uhd::rfnoc::radio_ctrl>& node : +                downstream_radio_nodes) {                  if (node->get_block_id() == radio_id) { -                    node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), radio_port); +                    node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, +                        async_xport.recv_sid.get_dst(), +                        radio_port);                  }              }          } else { @@ -663,34 +649,41 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)              // is not the same as the block_port.  It should be removed as              // soon as possible.              // Find all downstream radio nodes and set their response SID to the host -            std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl> > downstream_radio_nodes = blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); -            UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); -            for(const boost::shared_ptr<uhd::rfnoc::radio_ctrl> &node:  downstream_radio_nodes) { -                node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); +            std::vector<boost::shared_ptr<uhd::rfnoc::radio_ctrl>> +                downstream_radio_nodes = +                    blk_ctrl->find_downstream_node<uhd::rfnoc::radio_ctrl>(); +            UHD_TX_STREAMER_LOG() +                << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); +            for (const boost::shared_ptr<uhd::rfnoc::radio_ctrl>& node : +                downstream_radio_nodes) { +                node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, +                    async_xport.recv_sid.get_dst(), +                    block_port);              }          } -        // Second, configure the streamer now that the blocks and transports are configured - -        //make the new streamer given the samples per packet -        if (not my_streamer) -        { -            // To calculate the max number of samples per packet, we assume the maximum header length -            // to avoid fragmentation should the entire header be used. -            const size_t bpp = tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; -            const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item -            const size_t spp = std::min(args.args.cast<size_t>("spp", bpp/bpi), bpp/bpi); // samples per packet -            UHD_TX_STREAMER_LOG() << "spp == " << spp ; +        // Second, configure the streamer now that the blocks and transports are +        // configured + +        // make the new streamer given the samples per packet +        if (not my_streamer) { +            // To calculate the max number of samples per packet, we assume the maximum +            // header length to avoid fragmentation should the entire header be used. +            const size_t bpp = +                tx_hints.cast<size_t>("bpp", xport.send->get_send_frame_size()) +                - stream_options.tx_max_len_hdr; +            const size_t bpi = +                convert::get_bytes_per_item(args.otw_format); // bytes per item +            const size_t spp = std::min(args.args.cast<size_t>("spp", bpp / bpi), +                bpp / bpi); // samples per packet +            UHD_TX_STREAMER_LOG() << "spp == " << spp;              my_streamer = boost::make_shared<device3_send_packet_streamer>( -                    spp, -                    send_terminator, -                    xport, -                    async_xport); +                spp, send_terminator, xport, async_xport);              my_streamer->resize(chan_list.size());          } -        //init some streamer stuff +        // init some streamer stuff          std::string conv_endianness;          if (xport.endianness == ENDIANNESS_BIG) {              my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); @@ -700,69 +693,57 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)              conv_endianness = "le";          } -        //set the converter +        // set the converter          uhd::convert::id_type id; -        id.input_format = args.cpu_format; -        id.num_inputs = 1; +        id.input_format  = args.cpu_format; +        id.num_inputs    = 1;          id.output_format = args.otw_format + "_item32_" + conv_endianness; -        id.num_outputs = 1; +        id.num_outputs   = 1;          my_streamer->set_converter(id);          boost::shared_ptr<async_tx_info_t> async_tx_info(new async_tx_info_t()); -        async_tx_info->stream_channel = args.channels[stream_i]; -        async_tx_info->device_channel = mb_index; -        async_tx_info->async_queue = async_md; +        async_tx_info->stream_channel  = args.channels[stream_i]; +        async_tx_info->device_channel  = mb_index; +        async_tx_info->async_queue     = async_md;          async_tx_info->old_async_queue = _async_md; -        task::sptr async_task = task::make( -            [async_tx_info, async_xport, xport, send_terminator]() { -                handle_tx_async_msgs( -                        async_tx_info, -                        async_xport.recv, -                        xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> : uhd::wtohx<uint32_t>, -                        xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le, -                        [send_terminator]() {return send_terminator->get_tick_rate();} -                ); -            } -        ); +        task::sptr async_task = +            task::make([async_tx_info, async_xport, xport, send_terminator]() { +                handle_tx_async_msgs(async_tx_info, +                    async_xport.recv, +                    xport.endianness == ENDIANNESS_BIG ? uhd::ntohx<uint32_t> +                                                       : uhd::wtohx<uint32_t>, +                    xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be +                                                       : vrt::chdr::if_hdr_unpack_le, +                    [send_terminator]() { return send_terminator->get_tick_rate(); }); +            });          my_streamer->add_async_msg_task(async_task); -        //Give the streamer a functor to get the send buffer -        my_streamer->set_xport_chan_get_buff( -            stream_i, -            [xport](const double timeout) { -                return xport.send->get_send_buff(timeout); -            } -        ); -        //Give the streamer a functor handled received async messages +        // Give the streamer a functor to get the send buffer +        my_streamer->set_xport_chan_get_buff(stream_i, +            [xport](const double timeout) { return xport.send->get_send_buff(timeout); }); +        // Give the streamer a functor handled received async messages          my_streamer->set_async_receiver(              [async_md](uhd::async_metadata_t& md, const double timeout) {                  return async_md->pop_with_timed_wait(md, timeout); -            } -        ); +            });          my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid);          // CHDR does not support trailers          my_streamer->set_enable_trailer(false); -        my_streamer->set_xport_chan_post_send_cb( -            stream_i, -            [fc_cache, xport]() { -                tx_flow_ctrl_ack( -                    fc_cache, -                    xport.send, -                    xport.send_sid -                ); -            } -        ); +        my_streamer->set_xport_chan_post_send_cb(stream_i, [fc_cache, xport]() { +            tx_flow_ctrl_ack(fc_cache, xport.send, xport.send_sid); +        });      }      // Notify all blocks in this chain that they are connected to an active streamer      send_terminator->set_tx_streamer(true, 0); -    // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. -    // Note that we store the streamer only once, and use its terminator's -    // ID to do so. -    _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr<uhd::tx_streamer>(my_streamer); +    // Store a weak pointer to prevent a streamer->device3_impl->streamer circular +    // dependency. Note that we store the streamer only once, and use its terminator's ID +    // to do so. +    _tx_streamers[send_terminator->unique_id()] = +        boost::weak_ptr<uhd::tx_streamer>(my_streamer);      // Sets tick rate, samp rate and scaling on this streamer      // A registered terminator is required to do this. @@ -771,5 +752,3 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)      post_streamer_hooks(TX_DIRECTION);      return my_streamer;  } - - | 
