From 967be2a4e82b1a125b26bb72a60318a4fb2b50c4 Mon Sep 17 00:00:00 2001 From: Brent Stapleton Date: Mon, 14 Jan 2019 10:35:25 -0800 Subject: uhd: mpm: apply clang-format to all files Applying formatting changes to all .cpp and .hpp files in the following directories: ``` find host/examples/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file find host/tests/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file find host/lib/usrp/dboard/neon/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file find host/lib/usrp/dboard/magnesium/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file find host/lib/usrp/device3/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file find host/lib/usrp/mpmd/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file find host/lib/usrp/x300/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file find host/utils/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file find mpm/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file ``` Also formatted host/include/, except Cpp03 was used as a the language standard instead of Cpp11. ``` sed -i 's/ Cpp11/ Cpp03/g' .clang-format find host/include/ -iname *.hpp -o -iname *.cpp | \ xargs clang-format -i -style=file ``` Formatting style was designated by the .clang-format file. --- host/lib/usrp/device3/device3_flow_ctrl.hpp | 259 +++++++------ host/lib/usrp/device3/device3_impl.cpp | 173 ++++----- host/lib/usrp/device3/device3_impl.hpp | 154 ++++---- host/lib/usrp/device3/device3_io_impl.cpp | 579 ++++++++++++++-------------- 4 files changed, 562 insertions(+), 603 deletions(-) (limited to 'host/lib/usrp/device3') diff --git a/host/lib/usrp/device3/device3_flow_ctrl.hpp b/host/lib/usrp/device3/device3_flow_ctrl.hpp index 50081543a..535d7fbac 100644 --- a/host/lib/usrp/device3/device3_flow_ctrl.hpp +++ b/host/lib/usrp/device3/device3_flow_ctrl.hpp @@ -8,10 +8,10 @@ #define INCLUDED_DEVICE3_FLOW_CTRL_HPP #include "device3_impl.hpp" -#include -#include -#include #include +#include +#include +#include #include namespace uhd { namespace usrp { @@ -19,12 +19,14 @@ namespace uhd { namespace usrp { //! Stores the state of RX flow control struct rx_fc_cache_t { - rx_fc_cache_t(): - interval(0), - last_byte_count(0), - total_bytes_consumed(0), - total_packets_consumed(0), - seq_num(0) {} + rx_fc_cache_t() + : interval(0) + , last_byte_count(0) + , total_bytes_consumed(0) + , total_packets_consumed(0) + , seq_num(0) + { + } //! Flow control interval in bytes size_t interval; @@ -40,117 +42,112 @@ struct rx_fc_cache_t uhd::transport::zero_copy_if::sptr xport; std::function to_host; std::function from_host; - std::function unpack; - std::function pack; + std::function + unpack; + std::function + pack; }; /*! Send out RX flow control packets. -* -* This function handles updating the counters for the consumed -* bytes and packets, determines if a flow control message is -* is necessary, and sends one if it is. Passing a nullptr for -* the buff parameter will skip the counter update. -* -* \param fc_cache RX flow control state information -* \param buff Receive buffer. Setting to nullptr will -* skip the counter update. -*/ + * + * This function handles updating the counters for the consumed + * bytes and packets, determines if a flow control message is + * is necessary, and sends one if it is. Passing a nullptr for + * the buff parameter will skip the counter update. + * + * \param fc_cache RX flow control state information + * \param buff Receive buffer. Setting to nullptr will + * skip the counter update. + */ inline bool rx_flow_ctrl( - boost::shared_ptr fc_cache, - uhd::transport::managed_buffer::sptr buff -) { + boost::shared_ptr fc_cache, uhd::transport::managed_buffer::sptr buff) +{ // If the caller supplied a buffer - if (buff) - { + if (buff) { // Unpack the header uhd::transport::vrt::if_packet_info_t packet_info; - packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *pkt = buff->cast(); + packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); + const uint32_t* pkt = buff->cast(); try { fc_cache->unpack(pkt, packet_info); - } - catch(const std::exception &ex) - { + } catch (const std::exception& ex) { // Log and ignore - UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl; + UHD_LOGGER_ERROR("RX FLOW CTRL") + << "Error unpacking packet: " << ex.what() << std::endl; return true; } // Update counters assuming the buffer is a consumed packet - if (not packet_info.error) - { + if (not packet_info.error) { fc_cache->total_bytes_consumed += buff->size(); fc_cache->total_packets_consumed++; } } // Just return if there is no need to send a flow control packet - if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) - { + if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) { return true; } // Time to send a flow control packet // Get a send buffer - uhd::transport::managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); + uhd::transport::managed_send_buffer::sptr fc_buff = + fc_cache->xport->get_send_buff(0.0); if (not fc_buff) { throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); } - uint32_t *pkt = fc_buff->cast(); + uint32_t* pkt = fc_buff->cast(); - //load packet info + // load packet info uhd::transport::vrt::if_packet_info_t packet_info; packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC; packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32; - packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = fc_cache->seq_num++; - packet_info.sob = false; - packet_info.eob = false; - packet_info.error = false; - packet_info.fc_ack = false; - packet_info.sid = fc_cache->sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; + packet_info.num_payload_bytes = packet_info.num_payload_words32 * sizeof(uint32_t); + packet_info.packet_count = fc_cache->seq_num++; + packet_info.sob = false; + packet_info.eob = false; + packet_info.error = false; + packet_info.fc_ack = false; + packet_info.sid = fc_cache->sid.get(); + packet_info.has_sid = true; + packet_info.has_cid = false; + packet_info.has_tsi = false; + packet_info.has_tsf = false; + packet_info.has_tlr = false; // Load Header: fc_cache->pack(pkt, packet_info); // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = + pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = fc_cache->from_host(fc_cache->total_packets_consumed); - pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = + pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = fc_cache->from_host(fc_cache->total_bytes_consumed); - //send the buffer over the interface - fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); + // send the buffer over the interface + fc_buff->commit(sizeof(uint32_t) * (packet_info.num_packet_words32)); - //update byte count + // update byte count fc_cache->last_byte_count = fc_cache->total_bytes_consumed; return true; } /*! Handle RX flow control ACK packets. -* -*/ + * + */ inline void handle_rx_flowctrl_ack( - boost::shared_ptr fc_cache, - const uint32_t *payload -) { - const uint32_t pkt_count = fc_cache->to_host(payload[0]); + boost::shared_ptr fc_cache, const uint32_t* payload) +{ + const uint32_t pkt_count = fc_cache->to_host(payload[0]); const uint32_t byte_count = fc_cache->to_host(payload[1]); - if (fc_cache->total_bytes_consumed != byte_count) - { + if (fc_cache->total_bytes_consumed != byte_count) { UHD_LOGGER_DEBUG("device3") << "oh noes: byte_count==" << byte_count - << " total_bytes_consumed==" << fc_cache->total_bytes_consumed - << std::hex << " sid==" << fc_cache->sid << std::dec - << std::endl - ; + << " total_bytes_consumed==" << fc_cache->total_bytes_consumed << std::hex + << " sid==" << fc_cache->sid << std::dec << std::endl; } - fc_cache->total_bytes_consumed = byte_count; + fc_cache->total_bytes_consumed = byte_count; fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? // This will send a flow control packet if there is a significant discrepancy @@ -160,14 +157,16 @@ inline void handle_rx_flowctrl_ack( //! Stores the state of TX flow control struct tx_fc_cache_t { - tx_fc_cache_t(uint32_t capacity): - last_byte_ack(0), - last_seq_ack(0), - byte_count(0), - pkt_count(0), - window_size(capacity), - fc_ack_seqnum(0), - fc_received(false) {} + tx_fc_cache_t(uint32_t capacity) + : last_byte_ack(0) + , last_seq_ack(0) + , byte_count(0) + , pkt_count(0) + , window_size(capacity) + , fc_ack_seqnum(0) + , fc_received(false) + { + } uint32_t last_byte_ack; uint32_t last_seq_ack; @@ -178,26 +177,28 @@ struct tx_fc_cache_t bool fc_received; std::function to_host; std::function from_host; - std::function unpack; - std::function pack; + std::function + unpack; + std::function + pack; }; -inline bool tx_flow_ctrl( - boost::shared_ptr fc_cache, +inline bool tx_flow_ctrl(boost::shared_ptr fc_cache, uhd::transport::zero_copy_if::sptr xport, - uhd::transport::managed_buffer::sptr buff -) { - while (true) - { + uhd::transport::managed_buffer::sptr buff) +{ + while (true) { // If there is space - if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size()) - { + if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) + >= buff->size()) { // All is good - packet will be sent fc_cache->byte_count += buff->size(); // Round up to nearest word - if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) - { - fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); + if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) { + fc_cache->byte_count += + uhd::usrp::DEVICE3_LINE_SIZE + - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); } fc_cache->pkt_count++; return true; @@ -205,33 +206,33 @@ inline bool tx_flow_ctrl( // Look for a flow control message to update the space available in the buffer. uhd::transport::managed_recv_buffer::sptr buff = xport->get_recv_buff(0.1); - if (buff) - { + if (buff) { uhd::transport::vrt::if_packet_info_t if_packet_info; - if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *packet_buff = buff->cast(); + if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); + const uint32_t* packet_buff = buff->cast(); try { fc_cache->unpack(packet_buff, if_packet_info); - } - catch(const std::exception &ex) - { - UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; + } catch (const std::exception& ex) { + UHD_LOGGER_ERROR("TX FLOW CTRL") + << "Error unpacking flow control packet: " << ex.what() << std::endl; continue; } - if (if_packet_info.packet_type != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC) - { - UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl; + if (if_packet_info.packet_type + != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_FC) { + UHD_LOGGER_ERROR("TX FLOW CTRL") + << "Unexpected packet received by flow control handler: " + << if_packet_info.packet_type << std::endl; continue; } - const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; - const uint32_t pkt_count = fc_cache->to_host(payload[0]); + const uint32_t* payload = &packet_buff[if_packet_info.num_header_words32]; + const uint32_t pkt_count = fc_cache->to_host(payload[0]); const uint32_t byte_count = fc_cache->to_host(payload[1]); // update the amount of space fc_cache->last_byte_ack = byte_count; - fc_cache->last_seq_ack = pkt_count; + fc_cache->last_seq_ack = pkt_count; fc_cache->fc_received = true; } @@ -239,13 +240,11 @@ inline bool tx_flow_ctrl( return false; } -inline void tx_flow_ctrl_ack( - boost::shared_ptr fc_cache, +inline void tx_flow_ctrl_ack(boost::shared_ptr fc_cache, uhd::transport::zero_copy_if::sptr send_xport, - uhd::sid_t send_sid -) { - if (not fc_cache->fc_received) - { + uhd::sid_t send_sid) +{ + if (not fc_cache->fc_received) { return; } @@ -256,42 +255,42 @@ inline void tx_flow_ctrl_ack( UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; return; } - uint32_t *pkt = fc_buff->cast(); + uint32_t* pkt = fc_buff->cast(); // Load packet info uhd::transport::vrt::if_packet_info_t packet_info; packet_info.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_ACK; packet_info.num_payload_words32 = uhd::usrp::DEVICE3_FC_PACKET_LEN_IN_WORDS32; - packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = fc_cache->fc_ack_seqnum++; - packet_info.sob = false; - packet_info.eob = true; - packet_info.error = false; - packet_info.fc_ack = false; - packet_info.sid = send_sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; + packet_info.num_payload_bytes = packet_info.num_payload_words32 * sizeof(uint32_t); + packet_info.packet_count = fc_cache->fc_ack_seqnum++; + packet_info.sob = false; + packet_info.eob = true; + packet_info.error = false; + packet_info.fc_ack = false; + packet_info.sid = send_sid.get(); + packet_info.has_sid = true; + packet_info.has_cid = false; + packet_info.has_tsi = false; + packet_info.has_tsf = false; + packet_info.has_tlr = false; // Load Header: fc_cache->pack(pkt, packet_info); // Update counters to include this packet - size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); + size_t fc_ack_pkt_size = sizeof(uint32_t) * (packet_info.num_packet_words32); fc_cache->byte_count += fc_ack_pkt_size; // Round up to nearest word - if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) - { - fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); + if (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE) { + fc_cache->byte_count += uhd::usrp::DEVICE3_LINE_SIZE + - (fc_cache->byte_count % uhd::usrp::DEVICE3_LINE_SIZE); } fc_cache->pkt_count++; // Load Payload: Packet count, and byte count - pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = + pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_PACKET_COUNT_OFFSET] = fc_cache->from_host(fc_cache->pkt_count); - pkt[packet_info.num_header_words32+uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = + pkt[packet_info.num_header_words32 + uhd::usrp::DEVICE3_FC_BYTE_COUNT_OFFSET] = fc_cache->from_host(fc_cache->byte_count); // Send the buffer over the interface @@ -301,6 +300,6 @@ inline void tx_flow_ctrl_ack( fc_cache->fc_received = false; } -}}; +}}; // namespace uhd::usrp #endif /* INCLUDED_DEVICE3_FLOW_CTRL_HPP */ diff --git a/host/lib/usrp/device3/device3_impl.cpp b/host/lib/usrp/device3/device3_impl.cpp index 5705d6a84..d636b3338 100644 --- a/host/lib/usrp/device3/device3_impl.cpp +++ b/host/lib/usrp/device3/device3_impl.cpp @@ -6,10 +6,10 @@ // #include "device3_impl.hpp" -#include #include -#include +#include #include +#include #include #include @@ -18,21 +18,20 @@ using namespace uhd::usrp; device3_impl::device3_impl() { _type = uhd::device::USRP; - _async_md.reset(new async_md_type(1000/*messages deep*/)); + _async_md.reset(new async_md_type(1000 /*messages deep*/)); _tree = uhd::property_tree::make(); }; //! Returns true if the integer value stored in lhs is smaller than that in rhs -bool _compare_string_indexes(const std::string &lhs, const std::string &rhs) +bool _compare_string_indexes(const std::string& lhs, const std::string& rhs) { return boost::lexical_cast(lhs) < boost::lexical_cast(rhs); } -void device3_impl::merge_channel_defs( - const std::vector &chan_ids, - const std::vector &chan_args, - const uhd::direction_t dir -) { +void device3_impl::merge_channel_defs(const std::vector& chan_ids, + const std::vector& chan_args, + const uhd::direction_t dir) +{ UHD_ASSERT_THROW(chan_ids.size() == chan_args.size()); if (dir == uhd::DX_DIRECTION) { merge_channel_defs(chan_ids, chan_args, RX_DIRECTION); @@ -40,7 +39,8 @@ void device3_impl::merge_channel_defs( return; } - uhd::fs_path chans_root = uhd::fs_path("/channels/") / (dir == RX_DIRECTION ? "rx" : "tx"); + uhd::fs_path chans_root = + uhd::fs_path("/channels/") / (dir == RX_DIRECTION ? "rx" : "tx"); // Store the new positions of the channels: std::vector chan_idxs; @@ -54,18 +54,23 @@ void device3_impl::merge_channel_defs( // 2. Cycle through existing channels to find out where to merge // the new channels. Rules are: // - The order of chan_ids must be preserved - // - All block indices that are in chan_ids may be overwritten in the channel definition + // - All block indices that are in chan_ids may be overwritten in the channel + // definition // - If the channels in chan_ids are not yet in the property tree channel list, // they are appended. - for(const std::string &chan_idx: curr_channels) { + for (const std::string& chan_idx : curr_channels) { if (_tree->exists(chans_root / chan_idx)) { - rfnoc::block_id_t chan_block_id = _tree->access(chans_root / chan_idx).get(); - if (std::find(chan_ids.begin(), chan_ids.end(), chan_block_id) != chan_ids.end()) { + rfnoc::block_id_t chan_block_id = + _tree->access(chans_root / chan_idx).get(); + if (std::find(chan_ids.begin(), chan_ids.end(), chan_block_id) + != chan_ids.end()) { chan_idxs.push_back(boost::lexical_cast(chan_idx)); } } } - size_t last_chan_idx = curr_channels.empty() ? 0 : (boost::lexical_cast(curr_channels.back()) + 1); + size_t last_chan_idx = curr_channels.empty() + ? 0 + : (boost::lexical_cast(curr_channels.back()) + 1); while (chan_idxs.size() < chan_ids.size()) { chan_idxs.push_back(last_chan_idx); last_chan_idx++; @@ -80,27 +85,28 @@ void device3_impl::merge_channel_defs( if (not _tree->exists(chans_root / chan_idxs[i] / "args")) { _tree->create(chans_root / chan_idxs[i] / "args"); } - _tree->access(chans_root / chan_idxs[i] / "args").set(chan_args[i]); + _tree->access(chans_root / chan_idxs[i] / "args") + .set(chan_args[i]); } } /*********************************************************************** * RFNoC-Specific **********************************************************************/ -void device3_impl::enumerate_rfnoc_blocks( - size_t device_index, - size_t n_blocks, - size_t base_port, - const uhd::sid_t &base_sid, - uhd::device_addr_t transport_args -) { +void device3_impl::enumerate_rfnoc_blocks(size_t device_index, + size_t n_blocks, + size_t base_port, + const uhd::sid_t& base_sid, + uhd::device_addr_t transport_args) +{ // entries that are already connected to this block uhd::sid_t ctrl_sid = base_sid; - uhd::property_tree::sptr subtree = _tree->subtree(uhd::fs_path("/mboards") / device_index); + uhd::property_tree::sptr subtree = + _tree->subtree(uhd::fs_path("/mboards") / device_index); // 1) Clean property tree entries // TODO put this back once radios are actual rfnoc blocks!!!!!! - //if (subtree->exists("xbar")) { - //subtree->remove("xbar"); + // if (subtree->exists("xbar")) { + // subtree->remove("xbar"); //} // 2) Destroy existing block controllers // TODO: Clear out all the old block control classes @@ -109,40 +115,27 @@ void device3_impl::enumerate_rfnoc_blocks( // First, make a transport for port number zero, because we always need that: ctrl_sid.set_dst_xbarport(base_port + i); ctrl_sid.set_dst_blockport(0); - both_xports_t xport = this->make_transport( - ctrl_sid, - CTRL, - transport_args - ); + both_xports_t xport = this->make_transport(ctrl_sid, CTRL, transport_args); UHD_LOG_TRACE("DEVICE3", str(boost::format("Setting up NoC-Shell Control for port #0 (SID: %s)...") - % xport.send_sid.to_pp_string_hex()) - ); - uhd::rfnoc::ctrl_iface::sptr ctrl = uhd::rfnoc::ctrl_iface::make( - xport, - str(boost::format("CE_%02d_Port_%02X") - % i - % ctrl_sid.get_dst_endpoint()) - ); - uint64_t noc_id = ctrl->send_cmd_pkt( - uhd::rfnoc::SR_READBACK, - uhd::rfnoc::SR_READBACK_REG_ID, - true - ); - UHD_LOG_DEBUG("DEVICE3", str( - boost::format("Port 0x%02X: Found NoC-Block with ID %016X.") - % int(ctrl_sid.get_dst_endpoint()) - % noc_id - )); + % xport.send_sid.to_pp_string_hex())); + uhd::rfnoc::ctrl_iface::sptr ctrl = uhd::rfnoc::ctrl_iface::make(xport, + str(boost::format("CE_%02d_Port_%02X") % i % ctrl_sid.get_dst_endpoint())); + uint64_t noc_id = ctrl->send_cmd_pkt( + uhd::rfnoc::SR_READBACK, uhd::rfnoc::SR_READBACK_REG_ID, true); + UHD_LOG_DEBUG("DEVICE3", + str(boost::format("Port 0x%02X: Found NoC-Block with ID %016X.") + % int(ctrl_sid.get_dst_endpoint()) % noc_id)); uhd::rfnoc::make_args_t make_args; - uhd::rfnoc::blockdef::sptr block_def = uhd::rfnoc::blockdef::make_from_noc_id(noc_id); + uhd::rfnoc::blockdef::sptr block_def = + uhd::rfnoc::blockdef::make_from_noc_id(noc_id); if (not block_def) { UHD_LOG_WARNING("DEVICE3", "No block definition found, using default block configuration " - "for block with NOC ID: " + str(boost::format("0x%08X") % noc_id) - ); - block_def = uhd::rfnoc::blockdef::make_from_noc_id( - uhd::rfnoc::DEFAULT_NOC_ID); + "for block with NOC ID: " + + str(boost::format("0x%08X") % noc_id)); + block_def = + uhd::rfnoc::blockdef::make_from_noc_id(uhd::rfnoc::DEFAULT_NOC_ID); } UHD_ASSERT_THROW(block_def); make_args.ctrl_ifaces[0] = ctrl; @@ -151,71 +144,59 @@ void device3_impl::enumerate_rfnoc_blocks( continue; } ctrl_sid.set_dst_blockport(port_number); - both_xports_t xport1 = this->make_transport( - ctrl_sid, - CTRL, - transport_args - ); - UHD_LOG_TRACE("DEVICE3", str( - boost::format("Setting up NoC-Shell Control for port #%d " + both_xports_t xport1 = this->make_transport(ctrl_sid, CTRL, transport_args); + UHD_LOG_TRACE("DEVICE3", + str(boost::format("Setting up NoC-Shell Control for port #%d " "(SID: %s)...") - % port_number - % xport1.send_sid.to_pp_string_hex() - )); - uhd::rfnoc::ctrl_iface::sptr ctrl1 = uhd::rfnoc::ctrl_iface::make( - xport1, - str(boost::format("CE_%02d_Port_%02X") % i % ctrl_sid.get_dst_endpoint()) - ); + % port_number % xport1.send_sid.to_pp_string_hex())); + uhd::rfnoc::ctrl_iface::sptr ctrl1 = uhd::rfnoc::ctrl_iface::make(xport1, + str(boost::format("CE_%02d_Port_%02X") % i + % ctrl_sid.get_dst_endpoint())); make_args.ctrl_ifaces[port_number] = ctrl1; } UHD_LOG_TRACE("DEVICE3", - "All control transports successfully created for block with ID " << - str(boost::format("0x%08X") % noc_id) - ); + "All control transports successfully created for block with ID " + << str(boost::format("0x%08X") % noc_id)); make_args.base_address = xport.send_sid.get_dst(); make_args.device_index = device_index; - make_args.tree = subtree; - { //Critical section for block_ctrl vector access + make_args.tree = subtree; + { // Critical section for block_ctrl vector access boost::lock_guard lock(_block_ctrl_mutex); - _rfnoc_block_ctrl.push_back(uhd::rfnoc::block_ctrl_base::make(make_args, noc_id)); + _rfnoc_block_ctrl.push_back( + uhd::rfnoc::block_ctrl_base::make(make_args, noc_id)); } } } -uhd::rfnoc::graph::sptr device3_impl::create_graph(const std::string &name) +uhd::rfnoc::graph::sptr device3_impl::create_graph(const std::string& name) { // Create an async message handler - UHD_LOGGER_TRACE("DEVICE3") << "Creating async message handler for graph `" << name << "'..."; - // FIXME: right now this only can only handle source sid of 0 and xbar local addr of 2. - // This is ok for now because that most of our device has xbard local addr hardcode to 2. + UHD_LOGGER_TRACE("DEVICE3") + << "Creating async message handler for graph `" << name << "'..."; + // FIXME: right now this only can only handle source sid of 0 and xbar local addr + // of 2. This is ok for now because that most of our device has xbard local addr + // hardcode to 2. sid_t async_sid(0); async_sid.set_dst_addr(2); - both_xports_t async_xports = make_transport( - async_sid, - ASYNC_MSG, - //FIXME: only get rx_hints from mb index of 0 - get_rx_hints(0) - ); + both_xports_t async_xports = make_transport(async_sid, + ASYNC_MSG, + // FIXME: only get rx_hints from mb index of 0 + get_rx_hints(0)); UHD_LOGGER_TRACE("DEVICE3") << " Async transport ready." << std::endl; uhd::rfnoc::async_msg_handler::sptr async_msg_handler = - uhd::rfnoc::async_msg_handler::make( - async_xports.recv, - async_xports.send, - async_xports.send_sid, - async_xports.endianness - ); - UHD_LOGGER_TRACE("DEVICE3") << "Async message has address " << async_xports.send_sid << std::endl; + uhd::rfnoc::async_msg_handler::make(async_xports.recv, + async_xports.send, + async_xports.send_sid, + async_xports.endianness); + UHD_LOGGER_TRACE("DEVICE3") + << "Async message has address " << async_xports.send_sid << std::endl; // Create the graph UHD_LOGGER_TRACE("DEVICE3") << "Creating graph `" << name << "'..." << std::endl; uhd::rfnoc::graph::sptr graph = boost::make_shared( - name, - shared_from_this(), - async_msg_handler - ); + name, shared_from_this(), async_msg_handler); return graph; } - diff --git a/host/lib/usrp/device3/device3_impl.hpp b/host/lib/usrp/device3/device3_impl.hpp index e82597b9b..3bf6f6111 100644 --- a/host/lib/usrp/device3/device3_impl.hpp +++ b/host/lib/usrp/device3/device3_impl.hpp @@ -11,20 +11,20 @@ #ifndef INCLUDED_DEVICE3_IMPL_HPP #define INCLUDED_DEVICE3_IMPL_HPP +#include "../../transport/super_recv_packet_handler.hpp" +#include "../../transport/super_send_packet_handler.hpp" +#include #include -#include #include +#include #include -#include -#include -#include #include +#include +#include +#include #include -#include -#include "../../transport/super_send_packet_handler.hpp" -#include "../../transport/super_recv_packet_handler.hpp" -#include #include +#include #include namespace uhd { namespace usrp { @@ -32,31 +32,33 @@ namespace uhd { namespace usrp { /*********************************************************************** * Default settings (any device3 may override these) **********************************************************************/ -static const size_t DEVICE3_RX_FC_REQUEST_FREQ = 32; //per flow-control window -static const size_t DEVICE3_TX_FC_RESPONSE_FREQ = 8; -static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32 = 2; -static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET = 0; -static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET = 1; -static const size_t DEVICE3_LINE_SIZE = 8; - -static const size_t DEVICE3_TX_MAX_HDR_LEN = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes -static const size_t DEVICE3_RX_MAX_HDR_LEN = uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes - -// This class manages the lifetime of the TX async message handler task, transports, and terminator +static const size_t DEVICE3_RX_FC_REQUEST_FREQ = 32; // per flow-control window +static const size_t DEVICE3_TX_FC_RESPONSE_FREQ = 8; +static const size_t DEVICE3_FC_PACKET_LEN_IN_WORDS32 = 2; +static const size_t DEVICE3_FC_PACKET_COUNT_OFFSET = 0; +static const size_t DEVICE3_FC_BYTE_COUNT_OFFSET = 1; +static const size_t DEVICE3_LINE_SIZE = 8; + +static const size_t DEVICE3_TX_MAX_HDR_LEN = + uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes +static const size_t DEVICE3_RX_MAX_HDR_LEN = + uhd::transport::vrt::chdr::max_if_hdr_words64 * sizeof(uint64_t); // Bytes + +// This class manages the lifetime of the TX async message handler task, transports, and +// terminator class device3_send_packet_streamer : public uhd::transport::sph::send_packet_streamer { public: - device3_send_packet_streamer( - const size_t max_num_samps, - const uhd::rfnoc::tx_stream_terminator::sptr terminator, - const both_xports_t data_xport, - const both_xports_t async_msg_xport - ) : - uhd::transport::sph::send_packet_streamer(max_num_samps), - _terminator(terminator), - _data_xport(data_xport), - _async_msg_xport(async_msg_xport) - {} + device3_send_packet_streamer(const size_t max_num_samps, + const uhd::rfnoc::tx_stream_terminator::sptr terminator, + const both_xports_t data_xport, + const both_xports_t async_msg_xport) + : uhd::transport::sph::send_packet_streamer(max_num_samps) + , _terminator(terminator) + , _data_xport(data_xport) + , _async_msg_xport(async_msg_xport) + { + } ~device3_send_packet_streamer() { @@ -81,18 +83,19 @@ private: std::vector _tx_async_msg_tasks; }; -// This class manages the lifetime of the RX transports and terminator and provides access to both +// This class manages the lifetime of the RX transports and terminator and provides access +// to both class device3_recv_packet_streamer : public uhd::transport::sph::recv_packet_streamer { public: - device3_recv_packet_streamer( - const size_t max_num_samps, - const uhd::rfnoc::rx_stream_terminator::sptr terminator, - const both_xports_t xport - ) : - uhd::transport::sph::recv_packet_streamer(max_num_samps), - _terminator(terminator), - _xport(xport) {} + device3_recv_packet_streamer(const size_t max_num_samps, + const uhd::rfnoc::rx_stream_terminator::sptr terminator, + const both_xports_t xport) + : uhd::transport::sph::recv_packet_streamer(max_num_samps) + , _terminator(terminator) + , _xport(xport) + { + } ~device3_recv_packet_streamer() {} @@ -111,7 +114,8 @@ private: both_xports_t _xport; }; -class device3_impl : public uhd::device3, public boost::enable_shared_from_this +class device3_impl : public uhd::device3, + public boost::enable_shared_from_this { public: /*********************************************************************** @@ -120,14 +124,9 @@ public: typedef uhd::transport::bounded_buffer async_md_type; //! The purpose of a transport - enum xport_type_t { - CTRL = 0, - ASYNC_MSG, - TX_DATA, - RX_DATA - }; + enum xport_type_t { CTRL = 0, ASYNC_MSG, TX_DATA, RX_DATA }; - enum xport_t {AXI, ETH, PCIE}; + enum xport_t { AXI, ETH, PCIE }; //! Stores all streaming-related options struct stream_options_t @@ -145,20 +144,21 @@ public: , rx_max_len_hdr(DEVICE3_RX_MAX_HDR_LEN) , rx_fc_request_freq(DEVICE3_RX_FC_REQUEST_FREQ) , tx_fc_response_freq(DEVICE3_TX_FC_RESPONSE_FREQ) - {} + { + } }; /*********************************************************************** * I/O Interface **********************************************************************/ - uhd::tx_streamer::sptr get_tx_stream(const uhd::stream_args_t &); - uhd::rx_streamer::sptr get_rx_stream(const uhd::stream_args_t &); - bool recv_async_msg(uhd::async_metadata_t &async_metadata, double timeout); + uhd::tx_streamer::sptr get_tx_stream(const uhd::stream_args_t&); + uhd::rx_streamer::sptr get_rx_stream(const uhd::stream_args_t&); + bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout); /*********************************************************************** * Other public APIs **********************************************************************/ - rfnoc::graph::sptr create_graph(const std::string &name=""); + rfnoc::graph::sptr create_graph(const std::string& name = ""); protected: /*********************************************************************** @@ -172,10 +172,10 @@ protected: **********************************************************************/ // The 'rate' argument is so we can use these as subscribers to rate changes public: // TODO make these protected again - void update_rx_streamers(double rate=-1.0); - void update_tx_streamers(double rate=-1.0); -protected: + void update_rx_streamers(double rate = -1.0); + void update_tx_streamers(double rate = -1.0); +protected: /*********************************************************************** * Transport-related **********************************************************************/ @@ -187,17 +187,21 @@ protected: * The source address in this value is not considered, only the * destination address. * \param xport_type Specify which kind of transport this is. - * \param args Additional arguments for the transport generation. See \ref page_transport - * for valid arguments. + * \param args Additional arguments for the transport generation. See \ref + * page_transport for valid arguments. */ - virtual uhd::both_xports_t make_transport( - const uhd::sid_t &address, + virtual uhd::both_xports_t make_transport(const uhd::sid_t& address, const xport_type_t xport_type, - const uhd::device_addr_t& args - ) = 0; + const uhd::device_addr_t& args) = 0; - virtual uhd::device_addr_t get_tx_hints(size_t) { return uhd::device_addr_t(); } - virtual uhd::device_addr_t get_rx_hints(size_t) { return uhd::device_addr_t(); } + virtual uhd::device_addr_t get_tx_hints(size_t) + { + return uhd::device_addr_t(); + } + virtual uhd::device_addr_t get_rx_hints(size_t) + { + return uhd::device_addr_t(); + } //! Is called after a streamer is generated virtual void post_streamer_hooks(uhd::direction_t) {} @@ -216,29 +220,25 @@ protected: * \param chan_args New channel args. Must have same length as chan_ids. * */ - void merge_channel_defs( - const std::vector &chan_ids, - const std::vector &chan_args, - const uhd::direction_t dir - ); + void merge_channel_defs(const std::vector& chan_ids, + const std::vector& chan_args, + const uhd::direction_t dir); /*********************************************************************** * RFNoC-Specific **********************************************************************/ - void enumerate_rfnoc_blocks( - size_t device_index, - size_t n_blocks, - size_t base_port, - const uhd::sid_t &base_sid, - uhd::device_addr_t transport_args - ); + void enumerate_rfnoc_blocks(size_t device_index, + size_t n_blocks, + size_t base_port, + const uhd::sid_t& base_sid, + uhd::device_addr_t transport_args); /*********************************************************************** * Members **********************************************************************/ // TODO: Maybe move these to private - uhd::dict > _rx_streamers; - uhd::dict > _tx_streamers; + uhd::dict> _rx_streamers; + uhd::dict> _tx_streamers; private: /*********************************************************************** diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index 7afa2ace0..c0f91368d 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -7,16 +7,16 @@ // Provides streaming-related functions which are used by device3 objects. -#include "device3_impl.hpp" #include "device3_flow_ctrl.hpp" +#include "device3_impl.hpp" #include -#include +#include +#include #include +#include +#include #include #include -#include -#include -#include #include #include #include @@ -32,7 +32,7 @@ using namespace uhd::transport; /*********************************************************************** * Helper functions for get_?x_stream() **********************************************************************/ -static uhd::stream_args_t sanitize_stream_args(const uhd::stream_args_t &args_) +static uhd::stream_args_t sanitize_stream_args(const uhd::stream_args_t& args_) { uhd::stream_args_t args = args_; if (args.channels.empty()) { @@ -42,31 +42,33 @@ static uhd::stream_args_t sanitize_stream_args(const uhd::stream_args_t &args_) return args; } -static void check_stream_sig_compatible(const rfnoc::stream_sig_t &stream_sig, stream_args_t &args, const std::string &tx_rx) +static void check_stream_sig_compatible( + const rfnoc::stream_sig_t& stream_sig, stream_args_t& args, const std::string& tx_rx) { if (args.otw_format.empty()) { if (stream_sig.item_type.empty()) { - throw uhd::runtime_error(str( - boost::format("[%s Streamer] No otw_format defined!") % tx_rx - )); + throw uhd::runtime_error( + str(boost::format("[%s Streamer] No otw_format defined!") % tx_rx)); } else { args.otw_format = stream_sig.item_type; } - } else if (not stream_sig.item_type.empty() and stream_sig.item_type != args.otw_format) { - throw uhd::runtime_error(str( - boost::format("[%s Streamer] Conflicting OTW types defined: args.otw_format = '%s' <=> stream_sig.item_type = '%s'") - % tx_rx % args.otw_format % stream_sig.item_type - )); + } else if (not stream_sig.item_type.empty() + and stream_sig.item_type != args.otw_format) { + throw uhd::runtime_error( + str(boost::format("[%s Streamer] Conflicting OTW types defined: " + "args.otw_format = '%s' <=> stream_sig.item_type = '%s'") + % tx_rx % args.otw_format % stream_sig.item_type)); } const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item if (stream_sig.packet_size) { if (args.args.has_key("spp")) { size_t args_spp = args.args.cast("spp", 0); if (args_spp * bpi != stream_sig.packet_size) { - throw uhd::runtime_error(str( - boost::format("[%s Streamer] Conflicting packet sizes defined: args yields %d bytes but stream_sig.packet_size is %d bytes") - % tx_rx % (args_spp * bpi) % stream_sig.packet_size - )); + throw uhd::runtime_error( + str(boost::format( + "[%s Streamer] Conflicting packet sizes defined: args yields " + "%d bytes but stream_sig.packet_size is %d bytes") + % tx_rx % (args_spp * bpi) % stream_sig.packet_size)); } } else { args.args["spp"] = str(boost::format("%d") % (stream_sig.packet_size / bpi)); @@ -82,19 +84,18 @@ static void check_stream_sig_compatible(const rfnoc::stream_sig_t &stream_sig, s * * \param args_ Stream args. * \param[out] chan_list The list of channels in the correct order. - * \param[out] chan_args Channel args for every channel. `chan_args.size() == chan_list.size()` + * \param[out] chan_args Channel args for every channel. `chan_args.size() == + * chan_list.size()` */ -void generate_channel_list( - const uhd::stream_args_t &args_, - std::vector &chan_list, - std::vector &chan_args -) { +void generate_channel_list(const uhd::stream_args_t& args_, + std::vector& chan_list, + std::vector& chan_args) +{ uhd::stream_args_t args = args_; std::vector chan_list_(args.channels.size()); std::vector chan_args_(args.channels.size()); - for (size_t i = 0; i < args.channels.size(); i++) - { + for (size_t i = 0; i < args.channels.size(); i++) { // Extract block ID size_t chan_idx = args.channels[i]; std::string key = str(boost::format("block_id%d") % chan_idx); @@ -103,10 +104,10 @@ void generate_channel_list( } else if (args.args.has_key("block_id")) { chan_list_[i] = args.args["block_id"]; } else { - throw uhd::runtime_error(str( - boost::format("Cannot create streamers: No block_id specified for channel %d.") - % chan_idx - )); + throw uhd::runtime_error( + str(boost::format( + "Cannot create streamers: No block_id specified for channel %d.") + % chan_idx)); } // Split off known channel specific args @@ -125,7 +126,7 @@ void generate_channel_list( } // Add all remaining args to all channel args - for(device_addr_t &chan_arg: chan_args_) { + for (device_addr_t& chan_arg : chan_args_) { chan_arg = chan_arg.to_string() + "," + args.args.to_string(); } @@ -154,25 +155,20 @@ void generate_channel_list( * \returns The size of the flow control window in number of packets */ static size_t get_rx_flow_control_window( - size_t pkt_size, - size_t sw_buff_size, - const device_addr_t& rx_args -) { + size_t pkt_size, size_t sw_buff_size, const device_addr_t& rx_args) +{ double fullness_factor = rx_args.cast( - "recv_buff_fullness", - uhd::rfnoc::DEFAULT_FC_RX_SW_BUFF_FULL_FACTOR - ); + "recv_buff_fullness", uhd::rfnoc::DEFAULT_FC_RX_SW_BUFF_FULL_FACTOR); if (fullness_factor < 0.01 || fullness_factor > 1) { - throw uhd::value_error("recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)"); + throw uhd::value_error( + "recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)"); } size_t window_in_bytes = (static_cast(sw_buff_size * fullness_factor)); if (rx_args.has_key("max_recv_window")) { window_in_bytes = std::min( - window_in_bytes, - rx_args.cast("max_recv_window", window_in_bytes) - ); + window_in_bytes, rx_args.cast("max_recv_window", window_in_bytes)); } if (window_in_bytes < pkt_size) { throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size."); @@ -200,32 +196,28 @@ struct async_tx_info_t * * This is run inside a uhd::task as long as this streamer lives. */ -static void handle_tx_async_msgs( - boost::shared_ptr async_info, - zero_copy_if::sptr xport, - uint32_t (*to_host)(uint32_t), - void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), - boost::function get_tick_rate -) { +static void handle_tx_async_msgs(boost::shared_ptr async_info, + zero_copy_if::sptr xport, + uint32_t (*to_host)(uint32_t), + void (*unpack)(const uint32_t* packet_buff, vrt::if_packet_info_t&), + boost::function get_tick_rate) +{ managed_recv_buffer::sptr buff = xport->get_recv_buff(); - if (not buff) - { + if (not buff) { return; } - //extract packet info + // extract packet info vrt::if_packet_info_t if_packet_info; - if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *packet_buff = buff->cast(); + if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); + const uint32_t* packet_buff = buff->cast(); - //unpacking can fail - try - { + // unpacking can fail + try { unpack(packet_buff, if_packet_info); - } - catch(const std::exception &ex) - { - UHD_LOGGER_ERROR("STREAMER") << "Error parsing async message packet: " << ex.what() ; + } catch (const std::exception& ex) { + UHD_LOGGER_ERROR("STREAMER") + << "Error parsing async message packet: " << ex.what(); return; } @@ -234,21 +226,20 @@ static void handle_tx_async_msgs( tick_rate = 1; } - //fill in the async metadata + // fill in the async metadata async_metadata_t metadata; - load_metadata_from_buff( - to_host, - metadata, - if_packet_info, - packet_buff, - tick_rate, - async_info->stream_channel - ); + load_metadata_from_buff(to_host, + metadata, + if_packet_info, + packet_buff, + tick_rate, + async_info->stream_channel); // Filter out any flow control messages and cache the rest - if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) - { - UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl; + if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { + UHD_LOGGER_ERROR("TX ASYNC MSG") + << "Unexpected flow control message found in async message handling" + << std::endl; } else { async_info->async_queue->push_with_pop_on_full(metadata); metadata.channel = async_info->device_channel; @@ -257,9 +248,7 @@ static void handle_tx_async_msgs( } } -bool device3_impl::recv_async_msg( - async_metadata_t &async_metadata, double timeout -) +bool device3_impl::recv_async_msg(async_metadata_t& async_metadata, double timeout) { return _async_md->pop_with_timed_wait(async_metadata, timeout); } @@ -269,10 +258,11 @@ bool device3_impl::recv_async_msg( **********************************************************************/ void device3_impl::update_rx_streamers(double /* rate */) { - for(const std::string &block_id: _rx_streamers.keys()) { + for (const std::string& block_id : _rx_streamers.keys()) { UHD_RX_STREAMER_LOG() << "updating RX streamer to " << block_id; boost::shared_ptr my_streamer = - boost::dynamic_pointer_cast(_rx_streamers[block_id].lock()); + boost::dynamic_pointer_cast( + _rx_streamers[block_id].lock()); if (my_streamer) { double tick_rate = my_streamer->get_terminator()->get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -285,9 +275,11 @@ void device3_impl::update_rx_streamers(double /* rate */) } double scaling = my_streamer->get_terminator()->get_output_scale_factor(); if (scaling == rfnoc::scalar_node_ctrl::SCALE_UNDEFINED) { - scaling = 1/32767.; + scaling = 1 / 32767.; } - UHD_RX_STREAMER_LOG() << " New tick_rate == " << tick_rate << " New samp_rate == " << samp_rate << " New scaling == " << scaling ; + UHD_RX_STREAMER_LOG() + << " New tick_rate == " << tick_rate + << " New samp_rate == " << samp_rate << " New scaling == " << scaling; my_streamer->set_tick_rate(tick_rate); my_streamer->set_samp_rate(samp_rate); @@ -296,7 +288,7 @@ void device3_impl::update_rx_streamers(double /* rate */) } } -rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) +rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t& args_) { boost::mutex::scoped_lock lock(_transport_setup_mutex); stream_args_t args = sanitize_stream_args(args_); @@ -312,88 +304,83 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) // The terminator's lifetime is coupled to the streamer. // There is only one terminator. If the streamer has multiple channels, // it will be connected to each upstream block. - rfnoc::rx_stream_terminator::sptr recv_terminator = rfnoc::rx_stream_terminator::make(); - for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) - { + rfnoc::rx_stream_terminator::sptr recv_terminator = + rfnoc::rx_stream_terminator::make(); + for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { // First, configure blocks and create transport // Get block ID and mb index uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; - UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id ; + UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id; // Update args so args.args is always valid for this particular channel: - args.args = chan_args[stream_i]; + args.args = chan_args[stream_i]; size_t mb_index = block_id.get_device_no(); - size_t suggested_block_port = args.args.cast("block_port", rfnoc::ANY_PORT); + size_t suggested_block_port = + args.args.cast("block_port", rfnoc::ANY_PORT); // Access to this channel's block control uhd::rfnoc::source_block_ctrl_base::sptr blk_ctrl = - boost::dynamic_pointer_cast(get_block_ctrl(block_id)); + boost::dynamic_pointer_cast( + get_block_ctrl(block_id)); // Connect the terminator with this channel's block. size_t block_port = blk_ctrl->connect_downstream( - recv_terminator, - suggested_block_port, - args.args - ); + recv_terminator, suggested_block_port, args.args); const size_t terminator_port = recv_terminator->connect_upstream(blk_ctrl); blk_ctrl->set_downstream_port(block_port, terminator_port); recv_terminator->set_upstream_port(terminator_port, block_port); // Check if the block connection is compatible (spp and item type) - check_stream_sig_compatible(blk_ctrl->get_output_signature(block_port), args, "RX"); + check_stream_sig_compatible( + blk_ctrl->get_output_signature(block_port), args, "RX"); // Setup the DSP transport hints device_addr_t rx_hints = get_rx_hints(mb_index); - //allocate sid and create transport + // allocate sid and create transport uhd::sid_t stream_address = blk_ctrl->get_address(block_port); - UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string() ; + UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string(); both_xports_t xport = make_transport(stream_address, RX_DATA, rx_hints); - UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size; + UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec + << " actual recv_buff_size = " << xport.recv_buff_size; // Configure the block // Flow control setup const size_t pkt_size = xport.recv->get_recv_frame_size(); // Leave one pkt_size space for overrun packets - TODO make this obsolete - const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) - pkt_size; - const size_t fc_handle_window = std::max(1, fc_window / stream_options.rx_fc_request_freq); - UHD_RX_STREAMER_LOG()<< "Flow Control Window = " << (fc_window) << ", Flow Control Handler Window = " << fc_handle_window; - blk_ctrl->configure_flow_control_out( - true, + const size_t fc_window = + get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) + - pkt_size; + const size_t fc_handle_window = + std::max(1, fc_window / stream_options.rx_fc_request_freq); + UHD_RX_STREAMER_LOG() << "Flow Control Window = " << (fc_window) + << ", Flow Control Handler Window = " << fc_handle_window; + blk_ctrl->configure_flow_control_out(true, fc_window, - rx_hints.cast("recv_pkt_limit", 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 - block_port - ); + rx_hints.cast("recv_pkt_limit", + 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 + block_port); // Add flow control transport boost::shared_ptr fc_cache(new rx_fc_cache_t()); - fc_cache->sid = xport.send_sid; - fc_cache->xport = xport.send; + fc_cache->sid = xport.send_sid; + fc_cache->xport = xport.send; fc_cache->interval = fc_handle_window; - if (xport.endianness == ENDIANNESS_BIG) - { - fc_cache->to_host = uhd::ntohx; + if (xport.endianness == ENDIANNESS_BIG) { + fc_cache->to_host = uhd::ntohx; fc_cache->from_host = uhd::htonx; - fc_cache->pack = vrt::chdr::if_hdr_pack_be; - fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; - } - else - { - fc_cache->to_host = uhd::wtohx; + fc_cache->pack = vrt::chdr::if_hdr_pack_be; + fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; + } else { + fc_cache->to_host = uhd::wtohx; fc_cache->from_host = uhd::htowx; - fc_cache->pack = vrt::chdr::if_hdr_pack_le; - fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; + fc_cache->pack = vrt::chdr::if_hdr_pack_le; + fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; } - xport.recv = zero_copy_flow_ctrl::make - ( - xport.recv, - NULL, - [fc_cache](managed_buffer::sptr buff) { - return rx_flow_ctrl( - fc_cache, - buff); - } - ); + xport.recv = zero_copy_flow_ctrl::make( + xport.recv, NULL, [fc_cache](managed_buffer::sptr buff) { + return rx_flow_ctrl(fc_cache, buff); + }); // Configure the block // Note: We need to set_destination() after writing to SR_CLEAR_TX_FC. @@ -406,36 +393,41 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) blk_ctrl->set_destination(xport.send_sid.get_src(), block_port); // Configure routing for responses - blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); - UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src() ; + blk_ctrl->sr_write( + uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); + UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src(); // Find all upstream radio nodes and set their response in SID to the host - std::vector > upstream_radio_nodes = blk_ctrl->find_upstream_node(); - UHD_RX_STREAMER_LOG() << "Number of upstream radio nodes: " << upstream_radio_nodes.size(); - for(const boost::shared_ptr &node: upstream_radio_nodes) { - node->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); + std::vector> upstream_radio_nodes = + blk_ctrl->find_upstream_node(); + UHD_RX_STREAMER_LOG() << "Number of upstream radio nodes: " + << upstream_radio_nodes.size(); + for (const boost::shared_ptr& node : + upstream_radio_nodes) { + node->sr_write( + uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); } // Second, configure the streamer - //make the new streamer given the samples per packet - if (not my_streamer) - { - // To calculate the max number of samples per packet, we assume the maximum header length - // to avoid fragmentation should the entire header be used. - const size_t bpp = pkt_size - stream_options.rx_max_len_hdr; // bytes per packet - const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item - const size_t spp = std::min(args.args.cast("spp", bpp/bpi), bpp/bpi); // samples per packet - UHD_RX_STREAMER_LOG() << "spp == " << spp ; + // make the new streamer given the samples per packet + if (not my_streamer) { + // To calculate the max number of samples per packet, we assume the maximum + // header length to avoid fragmentation should the entire header be used. + const size_t bpp = + pkt_size - stream_options.rx_max_len_hdr; // bytes per packet + const size_t bpi = + convert::get_bytes_per_item(args.otw_format); // bytes per item + const size_t spp = std::min(args.args.cast("spp", bpp / bpi), + bpp / bpi); // samples per packet + UHD_RX_STREAMER_LOG() << "spp == " << spp; my_streamer = boost::make_shared( - spp, - recv_terminator, - xport); + spp, recv_terminator, xport); my_streamer->resize(chan_list.size()); } - //init some streamer stuff + // init some streamer stuff std::string conv_endianness; if (xport.endianness == ENDIANNESS_BIG) { my_streamer->set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_be); @@ -445,63 +437,51 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) conv_endianness = "le"; } - //set the converter + // set the converter uhd::convert::id_type id; - id.input_format = args.otw_format + "_item32_" + conv_endianness; - id.num_inputs = 1; + id.input_format = args.otw_format + "_item32_" + conv_endianness; + id.num_inputs = 1; id.output_format = args.cpu_format; - id.num_outputs = 1; + id.num_outputs = 1; my_streamer->set_converter(id); // Give the streamer a functor to handle flow control ACK messages my_streamer->set_xport_handle_flowctrl_ack( - stream_i, - [fc_cache](const uint32_t *payload) { - handle_rx_flowctrl_ack( - fc_cache, - payload - ); - } - ); + stream_i, [fc_cache](const uint32_t* payload) { + handle_rx_flowctrl_ack(fc_cache, payload); + }); - //Give the streamer a functor to get the recv_buffer - my_streamer->set_xport_chan_get_buff( - stream_i, - [xport](double timeout) { - return xport.recv->get_recv_buff(timeout); - }, + // Give the streamer a functor to get the recv_buffer + my_streamer->set_xport_chan_get_buff(stream_i, + [xport](double timeout) { return xport.recv->get_recv_buff(timeout); }, true /*flush*/ ); - //Give the streamer a functor to handle overruns - //bind requires a weak_ptr to break the a streamer->streamer circular dependency - //Using "this" is OK because we know that this device3_impl will outlive the streamer + // Give the streamer a functor to handle overruns + // bind requires a weak_ptr to break the a streamer->streamer circular dependency + // Using "this" is OK because we know that this device3_impl will outlive the + // streamer boost::weak_ptr weak_ptr(my_streamer); my_streamer->set_overflow_handler( - stream_i, - [recv_terminator, weak_ptr, stream_i]() { - recv_terminator->handle_overrun( - weak_ptr, - stream_i); - } - ); + stream_i, [recv_terminator, weak_ptr, stream_i]() { + recv_terminator->handle_overrun(weak_ptr, stream_i); + }); - //Give the streamer a functor issue stream cmd + // Give the streamer a functor issue stream cmd my_streamer->set_issue_stream_cmd( - stream_i, - [blk_ctrl, block_port](const stream_cmd_t& stream_cmd) { + stream_i, [blk_ctrl, block_port](const stream_cmd_t& stream_cmd) { blk_ctrl->issue_stream_cmd(stream_cmd, block_port); - } - ); + }); } // Notify all blocks in this chain that they are connected to an active streamer recv_terminator->set_rx_streamer(true, 0); - // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. - // Note that we store the streamer only once, and use its terminator's - // ID to do so. - _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr(my_streamer); + // Store a weak pointer to prevent a streamer->device3_impl->streamer circular + // dependency. Note that we store the streamer only once, and use its terminator's ID + // to do so. + _rx_streamers[recv_terminator->unique_id()] = + boost::weak_ptr(my_streamer); // Sets tick rate, samp rate and scaling on this streamer. // A registered terminator is required to do this. @@ -516,10 +496,11 @@ rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) **********************************************************************/ void device3_impl::update_tx_streamers(double /* rate */) { - for(const std::string &block_id: _tx_streamers.keys()) { + for (const std::string& block_id : _tx_streamers.keys()) { UHD_TX_STREAMER_LOG() << "updating TX streamer: " << block_id; boost::shared_ptr my_streamer = - boost::dynamic_pointer_cast(_tx_streamers[block_id].lock()); + boost::dynamic_pointer_cast( + _tx_streamers[block_id].lock()); if (my_streamer) { double tick_rate = my_streamer->get_terminator()->get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { @@ -533,7 +514,9 @@ void device3_impl::update_tx_streamers(double /* rate */) if (scaling == rfnoc::scalar_node_ctrl::SCALE_UNDEFINED) { scaling = 32767.; } - UHD_TX_STREAMER_LOG() << "New tick_rate == " << tick_rate << " New samp_rate == " << samp_rate << " New scaling == " << scaling ; + UHD_TX_STREAMER_LOG() + << "New tick_rate == " << tick_rate << " New samp_rate == " << samp_rate + << " New scaling == " << scaling; my_streamer->set_tick_rate(tick_rate); my_streamer->set_samp_rate(samp_rate); my_streamer->set_scale_factor(scaling); @@ -541,7 +524,7 @@ void device3_impl::update_tx_streamers(double /* rate */) } } -tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) +tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t& args_) { boost::mutex::scoped_lock lock(_transport_setup_mutex); stream_args_t args = sanitize_stream_args(args_); @@ -552,108 +535,111 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) generate_channel_list(args, chan_list, chan_args); // Note: All 'args.args' are merged into chan_args now. - //shared async queue for all channels in streamer - boost::shared_ptr async_md(new async_md_type(1000/*messages deep*/)); + // shared async queue for all channels in streamer + boost::shared_ptr async_md(new async_md_type(1000 /*messages deep*/)); // II. Iterate over all channels boost::shared_ptr my_streamer; // The terminator's lifetime is coupled to the streamer. // There is only one terminator. If the streamer has multiple channels, // it will be connected to each downstream block. - rfnoc::tx_stream_terminator::sptr send_terminator = rfnoc::tx_stream_terminator::make(); - for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) - { + rfnoc::tx_stream_terminator::sptr send_terminator = + rfnoc::tx_stream_terminator::make(); + for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { // First, configure the downstream blocks and create the transports // Get block ID and mb index uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; // Update args so args.args is always valid for this particular channel: - args.args = chan_args[stream_i]; + args.args = chan_args[stream_i]; size_t mb_index = block_id.get_device_no(); - size_t suggested_block_port = args.args.cast("block_port", rfnoc::ANY_PORT); + size_t suggested_block_port = + args.args.cast("block_port", rfnoc::ANY_PORT); // Access to this channel's block control uhd::rfnoc::sink_block_ctrl_base::sptr blk_ctrl = - boost::dynamic_pointer_cast(get_block_ctrl(block_id)); + boost::dynamic_pointer_cast( + get_block_ctrl(block_id)); // Connect the terminator with this channel's block. // This will throw if the connection is not possible. - size_t block_port = blk_ctrl->connect_upstream( - send_terminator, - suggested_block_port, - args.args - ); + size_t block_port = + blk_ctrl->connect_upstream(send_terminator, suggested_block_port, args.args); const size_t terminator_port = send_terminator->connect_downstream(blk_ctrl); blk_ctrl->set_upstream_port(block_port, terminator_port); send_terminator->set_downstream_port(terminator_port, block_port); // Check if the block connection is compatible (spp and item type) - check_stream_sig_compatible(blk_ctrl->get_input_signature(block_port), args, "TX"); + check_stream_sig_compatible( + blk_ctrl->get_input_signature(block_port), args, "TX"); // Setup the dsp transport hints device_addr_t tx_hints = get_tx_hints(mb_index); const size_t fifo_size = blk_ctrl->get_fifo_size(block_port); // Allocate sid and create transport uhd::sid_t stream_address = blk_ctrl->get_address(block_port); - UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ; + UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string(); both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints); - both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t("")); - UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ; + both_xports_t async_xport = + make_transport(stream_address, ASYNC_MSG, device_addr_t("")); + UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec; // Configure flow control // This disables the FC module's output, do this before configuring flow control blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); // Configure flow control on downstream block - const size_t fc_window = std::min(tx_hints.cast("send_buff_size", fifo_size), fifo_size); - const size_t fc_handle_window = std::max(1, fc_window / stream_options.tx_fc_response_freq); - UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window << ", Flow Control Handler Window = " << fc_handle_window ; - blk_ctrl->configure_flow_control_in( - fc_handle_window, /*bytes*/ - block_port - ); + const size_t fc_window = + std::min(tx_hints.cast("send_buff_size", fifo_size), fifo_size); + const size_t fc_handle_window = + std::max(1, fc_window / stream_options.tx_fc_response_freq); + UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window + << ", Flow Control Handler Window = " << fc_handle_window; + blk_ctrl->configure_flow_control_in(fc_handle_window, /*bytes*/ + block_port); // Add flow control transport boost::shared_ptr fc_cache(new tx_fc_cache_t(fc_window)); - if (xport.endianness == ENDIANNESS_BIG) - { - fc_cache->to_host = uhd::ntohx; + if (xport.endianness == ENDIANNESS_BIG) { + fc_cache->to_host = uhd::ntohx; fc_cache->from_host = uhd::htonx; - fc_cache->pack = vrt::chdr::if_hdr_pack_be; - fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; + fc_cache->pack = vrt::chdr::if_hdr_pack_be; + fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; } else { - fc_cache->to_host = uhd::wtohx; + fc_cache->to_host = uhd::wtohx; fc_cache->from_host = uhd::htowx; - fc_cache->pack = vrt::chdr::if_hdr_pack_le; - fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; + fc_cache->pack = vrt::chdr::if_hdr_pack_le; + fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; } - xport.send = zero_copy_flow_ctrl::make( - xport.send, + xport.send = zero_copy_flow_ctrl::make(xport.send, [fc_cache, xport](managed_buffer::sptr buff) { - return tx_flow_ctrl( - fc_cache, - xport.recv, - buff); + return tx_flow_ctrl(fc_cache, xport.recv, buff); }, - NULL - ); + NULL); // Configure return path for async messages - blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); - UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ; + blk_ctrl->sr_write( + uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); + UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " + << boost::format("0x%04X") % xport.recv_sid.get_dst(); // FIXME: Once there is a better way to map the radio block and port // to the channel or another way to receive asynchronous messages that // is not in-band, this should be removed. - if (args.args.has_key("radio_id") and args.args.has_key("radio_port")) - { + if (args.args.has_key("radio_id") and args.args.has_key("radio_port")) { // Find downstream radio node and set the response SID to the host uhd::rfnoc::block_id_t radio_id(args.args["radio_id"]); size_t radio_port = args.args.cast("radio_port", 0); - std::vector > downstream_radio_nodes = blk_ctrl->find_downstream_node(); - UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); - for(const boost::shared_ptr &node: downstream_radio_nodes) { + std::vector> + downstream_radio_nodes = + blk_ctrl->find_downstream_node(); + UHD_TX_STREAMER_LOG() + << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); + for (const boost::shared_ptr& node : + downstream_radio_nodes) { if (node->get_block_id() == radio_id) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), radio_port); + node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, + async_xport.recv_sid.get_dst(), + radio_port); } } } else { @@ -663,34 +649,41 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) // is not the same as the block_port. It should be removed as // soon as possible. // Find all downstream radio nodes and set their response SID to the host - std::vector > downstream_radio_nodes = blk_ctrl->find_downstream_node(); - UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); - for(const boost::shared_ptr &node: downstream_radio_nodes) { - node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); + std::vector> + downstream_radio_nodes = + blk_ctrl->find_downstream_node(); + UHD_TX_STREAMER_LOG() + << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); + for (const boost::shared_ptr& node : + downstream_radio_nodes) { + node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, + async_xport.recv_sid.get_dst(), + block_port); } } - // Second, configure the streamer now that the blocks and transports are configured - - //make the new streamer given the samples per packet - if (not my_streamer) - { - // To calculate the max number of samples per packet, we assume the maximum header length - // to avoid fragmentation should the entire header be used. - const size_t bpp = tx_hints.cast("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; - const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item - const size_t spp = std::min(args.args.cast("spp", bpp/bpi), bpp/bpi); // samples per packet - UHD_TX_STREAMER_LOG() << "spp == " << spp ; + // Second, configure the streamer now that the blocks and transports are + // configured + + // make the new streamer given the samples per packet + if (not my_streamer) { + // To calculate the max number of samples per packet, we assume the maximum + // header length to avoid fragmentation should the entire header be used. + const size_t bpp = + tx_hints.cast("bpp", xport.send->get_send_frame_size()) + - stream_options.tx_max_len_hdr; + const size_t bpi = + convert::get_bytes_per_item(args.otw_format); // bytes per item + const size_t spp = std::min(args.args.cast("spp", bpp / bpi), + bpp / bpi); // samples per packet + UHD_TX_STREAMER_LOG() << "spp == " << spp; my_streamer = boost::make_shared( - spp, - send_terminator, - xport, - async_xport); + spp, send_terminator, xport, async_xport); my_streamer->resize(chan_list.size()); } - //init some streamer stuff + // init some streamer stuff std::string conv_endianness; if (xport.endianness == ENDIANNESS_BIG) { my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); @@ -700,69 +693,57 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) conv_endianness = "le"; } - //set the converter + // set the converter uhd::convert::id_type id; - id.input_format = args.cpu_format; - id.num_inputs = 1; + id.input_format = args.cpu_format; + id.num_inputs = 1; id.output_format = args.otw_format + "_item32_" + conv_endianness; - id.num_outputs = 1; + id.num_outputs = 1; my_streamer->set_converter(id); boost::shared_ptr async_tx_info(new async_tx_info_t()); - async_tx_info->stream_channel = args.channels[stream_i]; - async_tx_info->device_channel = mb_index; - async_tx_info->async_queue = async_md; + async_tx_info->stream_channel = args.channels[stream_i]; + async_tx_info->device_channel = mb_index; + async_tx_info->async_queue = async_md; async_tx_info->old_async_queue = _async_md; - task::sptr async_task = task::make( - [async_tx_info, async_xport, xport, send_terminator]() { - handle_tx_async_msgs( - async_tx_info, - async_xport.recv, - xport.endianness == ENDIANNESS_BIG ? uhd::ntohx : uhd::wtohx, - xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le, - [send_terminator]() {return send_terminator->get_tick_rate();} - ); - } - ); + task::sptr async_task = + task::make([async_tx_info, async_xport, xport, send_terminator]() { + handle_tx_async_msgs(async_tx_info, + async_xport.recv, + xport.endianness == ENDIANNESS_BIG ? uhd::ntohx + : uhd::wtohx, + xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be + : vrt::chdr::if_hdr_unpack_le, + [send_terminator]() { return send_terminator->get_tick_rate(); }); + }); my_streamer->add_async_msg_task(async_task); - //Give the streamer a functor to get the send buffer - my_streamer->set_xport_chan_get_buff( - stream_i, - [xport](const double timeout) { - return xport.send->get_send_buff(timeout); - } - ); - //Give the streamer a functor handled received async messages + // Give the streamer a functor to get the send buffer + my_streamer->set_xport_chan_get_buff(stream_i, + [xport](const double timeout) { return xport.send->get_send_buff(timeout); }); + // Give the streamer a functor handled received async messages my_streamer->set_async_receiver( [async_md](uhd::async_metadata_t& md, const double timeout) { return async_md->pop_with_timed_wait(md, timeout); - } - ); + }); my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid); // CHDR does not support trailers my_streamer->set_enable_trailer(false); - my_streamer->set_xport_chan_post_send_cb( - stream_i, - [fc_cache, xport]() { - tx_flow_ctrl_ack( - fc_cache, - xport.send, - xport.send_sid - ); - } - ); + my_streamer->set_xport_chan_post_send_cb(stream_i, [fc_cache, xport]() { + tx_flow_ctrl_ack(fc_cache, xport.send, xport.send_sid); + }); } // Notify all blocks in this chain that they are connected to an active streamer send_terminator->set_tx_streamer(true, 0); - // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. - // Note that we store the streamer only once, and use its terminator's - // ID to do so. - _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr(my_streamer); + // Store a weak pointer to prevent a streamer->device3_impl->streamer circular + // dependency. Note that we store the streamer only once, and use its terminator's ID + // to do so. + _tx_streamers[send_terminator->unique_id()] = + boost::weak_ptr(my_streamer); // Sets tick rate, samp rate and scaling on this streamer // A registered terminator is required to do this. @@ -771,5 +752,3 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) post_streamer_hooks(TX_DIRECTION); return my_streamer; } - - -- cgit v1.2.3