// // Copyright 2014-2016 Ettus Research LLC // Copyright 2018 Ettus Research, a National Instruments Company // // SPDX-License-Identifier: GPL-3.0-or-later // // Provides streaming-related functions which are used by device3 objects. #include "device3_impl.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #define UHD_TX_STREAMER_LOG() UHD_LOGGER_TRACE("STREAMER") #define UHD_RX_STREAMER_LOG() UHD_LOGGER_TRACE("STREAMER") using namespace uhd; using namespace uhd::usrp; using namespace uhd::transport; /*********************************************************************** * Helper functions for get_?x_stream() **********************************************************************/ static uhd::stream_args_t sanitize_stream_args(const uhd::stream_args_t &args_) { uhd::stream_args_t args = args_; if (args.channels.empty()) { args.channels = std::vector(1, 0); } return args; } static void check_stream_sig_compatible(const rfnoc::stream_sig_t &stream_sig, stream_args_t &args, const std::string &tx_rx) { if (args.otw_format.empty()) { if (stream_sig.item_type.empty()) { throw uhd::runtime_error(str( boost::format("[%s Streamer] No otw_format defined!") % tx_rx )); } else { args.otw_format = stream_sig.item_type; } } else if (not stream_sig.item_type.empty() and stream_sig.item_type != args.otw_format) { throw uhd::runtime_error(str( boost::format("[%s Streamer] Conflicting OTW types defined: args.otw_format = '%s' <=> stream_sig.item_type = '%s'") % tx_rx % args.otw_format % stream_sig.item_type )); } const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item if (stream_sig.packet_size) { if (args.args.has_key("spp")) { size_t args_spp = args.args.cast("spp", 0); if (args_spp * bpi != stream_sig.packet_size) { throw uhd::runtime_error(str( boost::format("[%s Streamer] Conflicting packet sizes defined: args yields %d bytes but stream_sig.packet_size is %d bytes") % tx_rx % (args_spp * bpi) % stream_sig.packet_size )); } } else { args.args["spp"] = str(boost::format("%d") % (stream_sig.packet_size / bpi)); } } } /*! \brief Returns a list of rx or tx channels for a streamer. * * If the given stream args contain instructions to set up channels, * those are used. Otherwise, the current device's channel definition * is consulted. * * \param args_ Stream args. * \param[out] chan_list The list of channels in the correct order. * \param[out] chan_args Channel args for every channel. `chan_args.size() == chan_list.size()` */ void generate_channel_list( const uhd::stream_args_t &args_, std::vector &chan_list, std::vector &chan_args ) { uhd::stream_args_t args = args_; std::vector chan_list_(args.channels.size()); std::vector chan_args_(args.channels.size()); for (size_t i = 0; i < args.channels.size(); i++) { // Extract block ID size_t chan_idx = args.channels[i]; std::string key = str(boost::format("block_id%d") % chan_idx); if (args.args.has_key(key)) { chan_list_[i] = args.args.pop(key); } else if (args.args.has_key("block_id")) { chan_list_[i] = args.args["block_id"]; } else { throw uhd::runtime_error(str( boost::format("Cannot create streamers: No block_id specified for channel %d.") % chan_idx )); } // Split off known channel specific args key = str(boost::format("block_port%d") % chan_idx); if (args.args.has_key(key)) { chan_args_[i]["block_port"] = args.args.pop(key); } key = str(boost::format("radio_id%d") % chan_idx); if (args.args.has_key(key)) { chan_args_[i]["radio_id"] = args.args.pop(key); } key = str(boost::format("radio_port%d") % chan_idx); if (args.args.has_key(key)) { chan_args_[i]["radio_port"] = args.args.pop(key); } } // Add all remaining args to all channel args for(device_addr_t &chan_arg: chan_args_) { chan_arg = chan_arg.to_string() + "," + args.args.to_string(); } chan_list = chan_list_; chan_args = chan_args_; } /*********************************************************************** * RX Flow Control Functions **********************************************************************/ //! Stores the state of RX flow control struct rx_fc_cache_t { rx_fc_cache_t(): interval(0), last_byte_count(0), total_bytes_consumed(0), total_packets_consumed(0), seq_num(0) {} //! Flow control interval in bytes size_t interval; //! Byte count at last flow control packet uint32_t last_byte_count; //! This will wrap around, but that's OK, because math. uint32_t total_bytes_consumed; //! This will wrap around, but that's OK, because math. uint32_t total_packets_consumed; //! Sequence number of next flow control packet uint64_t seq_num; sid_t sid; zero_copy_if::sptr xport; std::function to_host; std::function from_host; std::function unpack; std::function pack; }; /*! Determine the size of the flow control window in number of packets. * * This value depends on three things: * - The packet size (in bytes), P * - The size of the software buffer (in bytes), B * - The desired buffer fullness, F * * The FC window size is thus X = floor(B*F/P). * * \param pkt_size The maximum packet size in bytes * \param sw_buff_size Software buffer size in bytes * \param rx_args If this has a key 'recv_buff_fullness', this value will * be used for said fullness. Must be between 0.01 and 1. * * \returns The size of the flow control window in number of packets */ static size_t get_rx_flow_control_window( size_t pkt_size, size_t sw_buff_size, const device_addr_t& rx_args ) { double fullness_factor = rx_args.cast( "recv_buff_fullness", uhd::rfnoc::DEFAULT_FC_RX_SW_BUFF_FULL_FACTOR ); if (fullness_factor < 0.01 || fullness_factor > 1) { throw uhd::value_error("recv_buff_fullness must be in [0.01, 1] inclusive (1% to 100%)"); } size_t window_in_bytes = (static_cast(sw_buff_size * fullness_factor)); if (rx_args.has_key("max_recv_window")) { window_in_bytes = std::min( window_in_bytes, rx_args.cast("max_recv_window", window_in_bytes) ); } if (window_in_bytes < pkt_size) { throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size."); } UHD_ASSERT_THROW(size_t(sw_buff_size * fullness_factor) >= window_in_bytes); return window_in_bytes; } /*! Send out RX flow control packets. * * This function handles updating the counters for the consumed * bytes and packets, determines if a flow control message is * is necessary, and sends one if it is. Passing a nullptr for * the buff parameter will skip the counter update. * * \param fc_cache RX flow control state information * \param buff Receive buffer. Setting to nullptr will * skip the counter update. */ static bool rx_flow_ctrl( boost::shared_ptr fc_cache, managed_buffer::sptr buff ) { // If the caller supplied a buffer if (buff) { // Unpack the header vrt::if_packet_info_t packet_info; packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); const uint32_t *pkt = buff->cast(); try { fc_cache->unpack(pkt, packet_info); } catch(const std::exception &ex) { // Log and ignore UHD_LOGGER_ERROR("RX FLOW CTRL") << "Error unpacking packet: " << ex.what() << std::endl; return true; } // Update counters assuming the buffer is a consumed packet if (not packet_info.error) { fc_cache->total_bytes_consumed += buff->size(); fc_cache->total_packets_consumed++; } } // Just return if there is no need to send a flow control packet if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) { return true; } // Time to send a flow control packet // Get a send buffer managed_send_buffer::sptr fc_buff = fc_cache->xport->get_send_buff(0.0); if (not fc_buff) { throw uhd::runtime_error("rx_flowctrl timed out getting a send buffer"); } uint32_t *pkt = fc_buff->cast(); //load packet info vrt::if_packet_info_t packet_info; packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_FC; packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); packet_info.packet_count = fc_cache->seq_num++; packet_info.sob = false; packet_info.eob = false; packet_info.error = false; packet_info.fc_ack = false; packet_info.sid = fc_cache->sid.get(); packet_info.has_sid = true; packet_info.has_cid = false; packet_info.has_tsi = false; packet_info.has_tsf = false; packet_info.has_tlr = false; // Load Header: fc_cache->pack(pkt, packet_info); // Load Payload: Packet count, and byte count pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = fc_cache->from_host(fc_cache->total_packets_consumed); pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = fc_cache->from_host(fc_cache->total_bytes_consumed); //send the buffer over the interface fc_buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); //update byte count fc_cache->last_byte_count = fc_cache->total_bytes_consumed; return true; } /*! Handle RX flow control ACK packets. * */ static void handle_rx_flowctrl_ack( boost::shared_ptr fc_cache, const uint32_t *payload ) { const uint32_t pkt_count = fc_cache->to_host(payload[0]); const uint32_t byte_count = fc_cache->to_host(payload[1]); if (fc_cache->total_bytes_consumed != byte_count) { UHD_LOGGER_DEBUG("device3") << "oh noes: byte_count==" << byte_count << " total_bytes_consumed==" << fc_cache->total_bytes_consumed << std::hex << " sid==" << fc_cache->sid << std::dec << std::endl ; } fc_cache->total_bytes_consumed = byte_count; fc_cache->total_packets_consumed = pkt_count; // guess we need a pkt offset too? // This will send a flow control packet if there is a significant discrepancy rx_flow_ctrl(fc_cache, nullptr); } /*********************************************************************** * TX Flow Control Functions **********************************************************************/ #define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 //! Stores the state of TX flow control struct tx_fc_cache_t { tx_fc_cache_t(uint32_t capacity): last_byte_ack(0), last_seq_ack(0), byte_count(0), pkt_count(0), window_size(capacity), fc_ack_seqnum(0), fc_received(false) {} uint32_t last_byte_ack; uint32_t last_seq_ack; uint32_t byte_count; uint32_t pkt_count; uint32_t window_size; uint32_t fc_ack_seqnum; bool fc_received; std::function to_host; std::function from_host; std::function unpack; std::function pack; }; static bool tx_flow_ctrl( boost::shared_ptr fc_cache, zero_copy_if::sptr xport, managed_buffer::sptr buff ) { while (true) { // If there is space if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) >= buff->size()) { // All is good - packet will be sent fc_cache->byte_count += buff->size(); // Round up to nearest word if (fc_cache->byte_count % DEVICE3_LINE_SIZE) { fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); } fc_cache->pkt_count++; return true; } // Look for a flow control message to update the space available in the buffer. managed_recv_buffer::sptr buff = xport->get_recv_buff(0.1); if (buff) { vrt::if_packet_info_t if_packet_info; if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); const uint32_t *packet_buff = buff->cast(); try { fc_cache->unpack(packet_buff, if_packet_info); } catch(const std::exception &ex) { UHD_LOGGER_ERROR("TX FLOW CTRL") << "Error unpacking flow control packet: " << ex.what() << std::endl; continue; } if (if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_FC) { UHD_LOGGER_ERROR("TX FLOW CTRL") << "Unexpected packet received by flow control handler: " << if_packet_info.packet_type << std::endl; continue; } const uint32_t *payload = &packet_buff[if_packet_info.num_header_words32]; const uint32_t pkt_count = fc_cache->to_host(payload[0]); const uint32_t byte_count = fc_cache->to_host(payload[1]); // update the amount of space fc_cache->last_byte_ack = byte_count; fc_cache->last_seq_ack = pkt_count; fc_cache->fc_received = true; } } return false; } static void tx_flow_ctrl_ack( boost::shared_ptr fc_cache, zero_copy_if::sptr send_xport, sid_t send_sid ) { if (not fc_cache->fc_received) { return; } // Time to send a flow control ACK packet // Get a send buffer managed_send_buffer::sptr fc_buff = send_xport->get_send_buff(0.0); if (not fc_buff) { UHD_LOGGER_ERROR("tx_flow_ctrl_ack") << "timed out getting a send buffer"; return; } uint32_t *pkt = fc_buff->cast(); // Load packet info vrt::if_packet_info_t packet_info; packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_ACK; packet_info.num_payload_words32 = DEVICE3_FC_PACKET_LEN_IN_WORDS32; packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); packet_info.packet_count = fc_cache->fc_ack_seqnum++; packet_info.sob = false; packet_info.eob = true; packet_info.error = false; packet_info.fc_ack = false; packet_info.sid = send_sid.get(); packet_info.has_sid = true; packet_info.has_cid = false; packet_info.has_tsi = false; packet_info.has_tsf = false; packet_info.has_tlr = false; // Load Header: fc_cache->pack(pkt, packet_info); // Update counters to include this packet size_t fc_ack_pkt_size = sizeof(uint32_t)*(packet_info.num_packet_words32); fc_cache->byte_count += fc_ack_pkt_size; // Round up to nearest word if (fc_cache->byte_count % DEVICE3_LINE_SIZE) { fc_cache->byte_count += DEVICE3_LINE_SIZE - (fc_cache->byte_count % DEVICE3_LINE_SIZE); } fc_cache->pkt_count++; // Load Payload: Packet count, and byte count pkt[packet_info.num_header_words32+DEVICE3_FC_PACKET_COUNT_OFFSET] = fc_cache->from_host(fc_cache->pkt_count); pkt[packet_info.num_header_words32+DEVICE3_FC_BYTE_COUNT_OFFSET] = fc_cache->from_host(fc_cache->byte_count); // Send the buffer over the interface fc_buff->commit(fc_ack_pkt_size); // Reset for next FC fc_cache->fc_received = false; } /*********************************************************************** * TX Async Message Functions **********************************************************************/ struct async_tx_info_t { size_t stream_channel; size_t device_channel; boost::shared_ptr async_queue; boost::shared_ptr old_async_queue; }; /*! Handle incoming messages. * Send them to the async message queue for the user to poll. * * This is run inside a uhd::task as long as this streamer lives. */ static void handle_tx_async_msgs( boost::shared_ptr async_info, zero_copy_if::sptr xport, uint32_t (*to_host)(uint32_t), void (*unpack)(const uint32_t *packet_buff, vrt::if_packet_info_t &), boost::function get_tick_rate ) { managed_recv_buffer::sptr buff = xport->get_recv_buff(); if (not buff) { return; } //extract packet info vrt::if_packet_info_t if_packet_info; if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); const uint32_t *packet_buff = buff->cast(); //unpacking can fail try { unpack(packet_buff, if_packet_info); } catch(const std::exception &ex) { UHD_LOGGER_ERROR("STREAMER") << "Error parsing async message packet: " << ex.what() ; return; } double tick_rate = get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { tick_rate = 1; } //fill in the async metadata async_metadata_t metadata; load_metadata_from_buff( to_host, metadata, if_packet_info, packet_buff, tick_rate, async_info->stream_channel ); // Filter out any flow control messages and cache the rest if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { UHD_LOGGER_ERROR("TX ASYNC MSG") << "Unexpected flow control message found in async message handling" << std::endl; } else { async_info->async_queue->push_with_pop_on_full(metadata); metadata.channel = async_info->device_channel; async_info->old_async_queue->push_with_pop_on_full(metadata); standard_async_msg_prints(metadata); } } bool device3_impl::recv_async_msg( async_metadata_t &async_metadata, double timeout ) { return _async_md->pop_with_timed_wait(async_metadata, timeout); } /*********************************************************************** * Receive streamer **********************************************************************/ void device3_impl::update_rx_streamers(double /* rate */) { for(const std::string &block_id: _rx_streamers.keys()) { UHD_RX_STREAMER_LOG() << "updating RX streamer to " << block_id; boost::shared_ptr my_streamer = boost::dynamic_pointer_cast(_rx_streamers[block_id].lock()); if (my_streamer) { double tick_rate = my_streamer->get_terminator()->get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { tick_rate = 1.0; } my_streamer->set_tick_rate(tick_rate); double samp_rate = my_streamer->get_terminator()->get_output_samp_rate(); if (samp_rate == rfnoc::rate_node_ctrl::RATE_UNDEFINED) { samp_rate = 1.0; } double scaling = my_streamer->get_terminator()->get_output_scale_factor(); if (scaling == rfnoc::scalar_node_ctrl::SCALE_UNDEFINED) { scaling = 1/32767.; } UHD_RX_STREAMER_LOG() << " New tick_rate == " << tick_rate << " New samp_rate == " << samp_rate << " New scaling == " << scaling ; my_streamer->set_tick_rate(tick_rate); my_streamer->set_samp_rate(samp_rate); my_streamer->set_scale_factor(scaling); } } } rx_streamer::sptr device3_impl::get_rx_stream(const stream_args_t &args_) { boost::mutex::scoped_lock lock(_transport_setup_mutex); stream_args_t args = sanitize_stream_args(args_); // I. Generate the channel list std::vector chan_list; std::vector chan_args; generate_channel_list(args, chan_list, chan_args); // Note: All 'args.args' are merged into chan_args now. // II. Iterate over all channels boost::shared_ptr my_streamer; // The terminator's lifetime is coupled to the streamer. // There is only one terminator. If the streamer has multiple channels, // it will be connected to each upstream block. rfnoc::rx_stream_terminator::sptr recv_terminator = rfnoc::rx_stream_terminator::make(); for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { // First, configure blocks and create transport // Get block ID and mb index uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; UHD_RX_STREAMER_LOG() << "chan " << stream_i << " connecting to " << block_id ; // Update args so args.args is always valid for this particular channel: args.args = chan_args[stream_i]; size_t mb_index = block_id.get_device_no(); size_t suggested_block_port = args.args.cast("block_port", rfnoc::ANY_PORT); // Access to this channel's block control uhd::rfnoc::source_block_ctrl_base::sptr blk_ctrl = boost::dynamic_pointer_cast(get_block_ctrl(block_id)); // Connect the terminator with this channel's block. size_t block_port = blk_ctrl->connect_downstream( recv_terminator, suggested_block_port, args.args ); const size_t terminator_port = recv_terminator->connect_upstream(blk_ctrl); blk_ctrl->set_downstream_port(block_port, terminator_port); recv_terminator->set_upstream_port(terminator_port, block_port); // Check if the block connection is compatible (spp and item type) check_stream_sig_compatible(blk_ctrl->get_output_signature(block_port), args, "RX"); // Setup the DSP transport hints device_addr_t rx_hints = get_rx_hints(mb_index); //allocate sid and create transport uhd::sid_t stream_address = blk_ctrl->get_address(block_port); UHD_RX_STREAMER_LOG() << "creating rx stream " << rx_hints.to_string() ; both_xports_t xport = make_transport(stream_address, RX_DATA, rx_hints); UHD_RX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec << " actual recv_buff_size = " << xport.recv_buff_size; // Configure the block // Flow control setup const size_t pkt_size = xport.recv->get_recv_frame_size(); // Leave one pkt_size space for overrun packets - TODO make this obsolete const size_t fc_window = get_rx_flow_control_window(pkt_size, xport.recv_buff_size, rx_hints) - pkt_size; const size_t fc_handle_window = std::max(1, fc_window / stream_options.rx_fc_request_freq); UHD_RX_STREAMER_LOG()<< "Flow Control Window = " << (fc_window) << ", Flow Control Handler Window = " << fc_handle_window; blk_ctrl->configure_flow_control_out( true, fc_window, rx_hints.cast("recv_pkt_limit", 0), // On rfnoc-devel, update e300_impl::get_rx_hints() to set this to 32 block_port ); // Add flow control transport boost::shared_ptr fc_cache(new rx_fc_cache_t()); fc_cache->sid = xport.send_sid; fc_cache->xport = xport.send; fc_cache->interval = fc_handle_window; if (xport.endianness == ENDIANNESS_BIG) { fc_cache->to_host = uhd::ntohx; fc_cache->from_host = uhd::htonx; fc_cache->pack = vrt::chdr::if_hdr_pack_be; fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; } else { fc_cache->to_host = uhd::wtohx; fc_cache->from_host = uhd::htowx; fc_cache->pack = vrt::chdr::if_hdr_pack_le; fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; } xport.recv = zero_copy_flow_ctrl::make ( xport.recv, NULL, [fc_cache](managed_buffer::sptr buff) { return rx_flow_ctrl( fc_cache, buff); } ); // Configure the block // Note: We need to set_destination() after writing to SR_CLEAR_TX_FC. // See noc_shell.v, in the section called Stream Source for details. // Setting SR_CLEAR_TX_FC will actually also clear the destination and // other settings. blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x1, block_port); blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_TX_FC, 0x0, block_port); // Configure routing for data blk_ctrl->set_destination(xport.send_sid.get_src(), block_port); // Configure routing for responses blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); UHD_RX_STREAMER_LOG() << "resp_out_dst_sid == " << xport.send_sid.get_src() ; // Find all upstream radio nodes and set their response in SID to the host std::vector > upstream_radio_nodes = blk_ctrl->find_upstream_node(); UHD_RX_STREAMER_LOG() << "Number of upstream radio nodes: " << upstream_radio_nodes.size(); for(const boost::shared_ptr &node: upstream_radio_nodes) { node->sr_write(uhd::rfnoc::SR_RESP_OUT_DST_SID, xport.send_sid.get_src(), block_port); } // Second, configure the streamer //make the new streamer given the samples per packet if (not my_streamer) { // To calculate the max number of samples per packet, we assume the maximum header length // to avoid fragmentation should the entire header be used. const size_t bpp = pkt_size - stream_options.rx_max_len_hdr; // bytes per packet const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item const size_t spp = std::min(args.args.cast("spp", bpp/bpi), bpp/bpi); // samples per packet UHD_RX_STREAMER_LOG() << "spp == " << spp ; my_streamer = boost::make_shared( spp, recv_terminator, xport); my_streamer->resize(chan_list.size()); } //init some streamer stuff std::string conv_endianness; if (xport.endianness == ENDIANNESS_BIG) { my_streamer->set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_be); conv_endianness = "be"; } else { my_streamer->set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_le); conv_endianness = "le"; } //set the converter uhd::convert::id_type id; id.input_format = args.otw_format + "_item32_" + conv_endianness; id.num_inputs = 1; id.output_format = args.cpu_format; id.num_outputs = 1; my_streamer->set_converter(id); // Give the streamer a functor to handle flow control ACK messages my_streamer->set_xport_handle_flowctrl_ack( stream_i, [fc_cache](const uint32_t *payload) { handle_rx_flowctrl_ack( fc_cache, payload ); } ); //Give the streamer a functor to get the recv_buffer my_streamer->set_xport_chan_get_buff( stream_i, [xport](double timeout) { return xport.recv->get_recv_buff(timeout); }, true /*flush*/ ); //Give the streamer a functor to handle overruns //bind requires a weak_ptr to break the a streamer->streamer circular dependency //Using "this" is OK because we know that this device3_impl will outlive the streamer boost::weak_ptr weak_ptr(my_streamer); my_streamer->set_overflow_handler( stream_i, [recv_terminator, weak_ptr, stream_i]() { recv_terminator->handle_overrun( weak_ptr, stream_i); } ); //Give the streamer a functor issue stream cmd my_streamer->set_issue_stream_cmd( stream_i, [blk_ctrl, block_port](const stream_cmd_t& stream_cmd) { blk_ctrl->issue_stream_cmd(stream_cmd, block_port); } ); } // Notify all blocks in this chain that they are connected to an active streamer recv_terminator->set_rx_streamer(true, 0); // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. // Note that we store the streamer only once, and use its terminator's // ID to do so. _rx_streamers[recv_terminator->unique_id()] = boost::weak_ptr(my_streamer); // Sets tick rate, samp rate and scaling on this streamer. // A registered terminator is required to do this. update_rx_streamers(); post_streamer_hooks(RX_DIRECTION); return my_streamer; } /*********************************************************************** * Transmit streamer **********************************************************************/ void device3_impl::update_tx_streamers(double /* rate */) { for(const std::string &block_id: _tx_streamers.keys()) { UHD_TX_STREAMER_LOG() << "updating TX streamer: " << block_id; boost::shared_ptr my_streamer = boost::dynamic_pointer_cast(_tx_streamers[block_id].lock()); if (my_streamer) { double tick_rate = my_streamer->get_terminator()->get_tick_rate(); if (tick_rate == rfnoc::tick_node_ctrl::RATE_UNDEFINED) { tick_rate = 1.0; } double samp_rate = my_streamer->get_terminator()->get_input_samp_rate(); if (samp_rate == rfnoc::rate_node_ctrl::RATE_UNDEFINED) { samp_rate = 1.0; } double scaling = my_streamer->get_terminator()->get_input_scale_factor(); if (scaling == rfnoc::scalar_node_ctrl::SCALE_UNDEFINED) { scaling = 32767.; } UHD_TX_STREAMER_LOG() << "New tick_rate == " << tick_rate << " New samp_rate == " << samp_rate << " New scaling == " << scaling ; my_streamer->set_tick_rate(tick_rate); my_streamer->set_samp_rate(samp_rate); my_streamer->set_scale_factor(scaling); } } } tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) { boost::mutex::scoped_lock lock(_transport_setup_mutex); stream_args_t args = sanitize_stream_args(args_); // I. Generate the channel list std::vector chan_list; std::vector chan_args; generate_channel_list(args, chan_list, chan_args); // Note: All 'args.args' are merged into chan_args now. //shared async queue for all channels in streamer boost::shared_ptr async_md(new async_md_type(1000/*messages deep*/)); // II. Iterate over all channels boost::shared_ptr my_streamer; // The terminator's lifetime is coupled to the streamer. // There is only one terminator. If the streamer has multiple channels, // it will be connected to each downstream block. rfnoc::tx_stream_terminator::sptr send_terminator = rfnoc::tx_stream_terminator::make(); for (size_t stream_i = 0; stream_i < chan_list.size(); stream_i++) { // First, configure the downstream blocks and create the transports // Get block ID and mb index uhd::rfnoc::block_id_t block_id = chan_list[stream_i]; // Update args so args.args is always valid for this particular channel: args.args = chan_args[stream_i]; size_t mb_index = block_id.get_device_no(); size_t suggested_block_port = args.args.cast("block_port", rfnoc::ANY_PORT); // Access to this channel's block control uhd::rfnoc::sink_block_ctrl_base::sptr blk_ctrl = boost::dynamic_pointer_cast(get_block_ctrl(block_id)); // Connect the terminator with this channel's block. // This will throw if the connection is not possible. size_t block_port = blk_ctrl->connect_upstream( send_terminator, suggested_block_port, args.args ); const size_t terminator_port = send_terminator->connect_downstream(blk_ctrl); blk_ctrl->set_upstream_port(block_port, terminator_port); send_terminator->set_downstream_port(terminator_port, block_port); // Check if the block connection is compatible (spp and item type) check_stream_sig_compatible(blk_ctrl->get_input_signature(block_port), args, "TX"); // Setup the dsp transport hints device_addr_t tx_hints = get_tx_hints(mb_index); const size_t fifo_size = blk_ctrl->get_fifo_size(block_port); if (not tx_hints.has_key("send_buff_size")) { // Default buffer size to FIFO size tx_hints["send_buff_size"] = std::to_string(fifo_size); } // Allocate sid and create transport uhd::sid_t stream_address = blk_ctrl->get_address(block_port); UHD_TX_STREAMER_LOG() << "creating tx stream " << tx_hints.to_string() ; both_xports_t xport = make_transport(stream_address, TX_DATA, tx_hints); both_xports_t async_xport = make_transport(stream_address, ASYNC_MSG, device_addr_t("")); UHD_TX_STREAMER_LOG() << std::hex << "data_sid = " << xport.send_sid << std::dec ; // Configure flow control // This disables the FC module's output, do this before configuring flow control blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x1, block_port); blk_ctrl->sr_write(uhd::rfnoc::SR_CLEAR_RX_FC, 0x0, block_port); // Configure flow control on downstream block const size_t fc_window = std::min(tx_hints.cast("send_buff_size", fifo_size), fifo_size); const size_t fc_handle_window = std::max(1, fc_window / stream_options.tx_fc_response_freq); UHD_TX_STREAMER_LOG() << "Flow Control Window = " << fc_window << ", Flow Control Handler Window = " << fc_handle_window ; blk_ctrl->configure_flow_control_in( fc_handle_window, /*bytes*/ block_port ); // Add flow control transport boost::shared_ptr fc_cache(new tx_fc_cache_t(fc_window)); if (xport.endianness == ENDIANNESS_BIG) { fc_cache->to_host = uhd::ntohx; fc_cache->from_host = uhd::htonx; fc_cache->pack = vrt::chdr::if_hdr_pack_be; fc_cache->unpack = vrt::chdr::if_hdr_unpack_be; } else { fc_cache->to_host = uhd::wtohx; fc_cache->from_host = uhd::htowx; fc_cache->pack = vrt::chdr::if_hdr_pack_le; fc_cache->unpack = vrt::chdr::if_hdr_unpack_le; } xport.send = zero_copy_flow_ctrl::make( xport.send, [fc_cache, xport](managed_buffer::sptr buff) { return tx_flow_ctrl( fc_cache, xport.recv, buff); }, NULL ); // Configure return path for async messages blk_ctrl->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); UHD_TX_STREAMER_LOG() << "resp_in_dst_sid == " << boost::format("0x%04X") % xport.recv_sid.get_dst() ; // FIXME: Once there is a better way to map the radio block and port // to the channel or another way to receive asynchronous messages that // is not in-band, this should be removed. if (args.args.has_key("radio_id") and args.args.has_key("radio_port")) { // Find downstream radio node and set the response SID to the host uhd::rfnoc::block_id_t radio_id(args.args["radio_id"]); size_t radio_port = args.args.cast("radio_port", 0); std::vector > downstream_radio_nodes = blk_ctrl->find_downstream_node(); UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); for(const boost::shared_ptr &node: downstream_radio_nodes) { if (node->get_block_id() == radio_id) { node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), radio_port); } } } else { // FIXME: This block is preserved for legacy behavior where the // radio_id and radio_port are not provided. It fails if more // than one radio is visible downstream or the port on the radio // is not the same as the block_port. It should be removed as // soon as possible. // Find all downstream radio nodes and set their response SID to the host std::vector > downstream_radio_nodes = blk_ctrl->find_downstream_node(); UHD_TX_STREAMER_LOG() << "Number of downstream radio nodes: " << downstream_radio_nodes.size(); for(const boost::shared_ptr &node: downstream_radio_nodes) { node->sr_write(uhd::rfnoc::SR_RESP_IN_DST_SID, async_xport.recv_sid.get_dst(), block_port); } } // Second, configure the streamer now that the blocks and transports are configured //make the new streamer given the samples per packet if (not my_streamer) { // To calculate the max number of samples per packet, we assume the maximum header length // to avoid fragmentation should the entire header be used. const size_t bpp = tx_hints.cast("bpp", xport.send->get_send_frame_size()) - stream_options.tx_max_len_hdr; const size_t bpi = convert::get_bytes_per_item(args.otw_format); // bytes per item const size_t spp = std::min(args.args.cast("spp", bpp/bpi), bpp/bpi); // samples per packet UHD_TX_STREAMER_LOG() << "spp == " << spp ; my_streamer = boost::make_shared( spp, send_terminator, xport, async_xport); my_streamer->resize(chan_list.size()); } //init some streamer stuff std::string conv_endianness; if (xport.endianness == ENDIANNESS_BIG) { my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); conv_endianness = "be"; } else { my_streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_le); conv_endianness = "le"; } //set the converter uhd::convert::id_type id; id.input_format = args.cpu_format; id.num_inputs = 1; id.output_format = args.otw_format + "_item32_" + conv_endianness; id.num_outputs = 1; my_streamer->set_converter(id); boost::shared_ptr async_tx_info(new async_tx_info_t()); async_tx_info->stream_channel = args.channels[stream_i]; async_tx_info->device_channel = mb_index; async_tx_info->async_queue = async_md; async_tx_info->old_async_queue = _async_md; task::sptr async_task = task::make( [async_tx_info, async_xport, xport, send_terminator]() { handle_tx_async_msgs( async_tx_info, async_xport.recv, xport.endianness == ENDIANNESS_BIG ? uhd::ntohx : uhd::wtohx, xport.endianness == ENDIANNESS_BIG ? vrt::chdr::if_hdr_unpack_be : vrt::chdr::if_hdr_unpack_le, [send_terminator]() {return send_terminator->get_tick_rate();} ); } ); my_streamer->add_async_msg_task(async_task); //Give the streamer a functor to get the send buffer my_streamer->set_xport_chan_get_buff( stream_i, [xport](const double timeout) { return xport.send->get_send_buff(timeout); } ); //Give the streamer a functor handled received async messages my_streamer->set_async_receiver( [async_md](uhd::async_metadata_t& md, const double timeout) { return async_md->pop_with_timed_wait(md, timeout); } ); my_streamer->set_xport_chan_sid(stream_i, true, xport.send_sid); // CHDR does not support trailers my_streamer->set_enable_trailer(false); my_streamer->set_xport_chan_post_send_cb( stream_i, [fc_cache, xport]() { tx_flow_ctrl_ack( fc_cache, xport.send, xport.send_sid ); } ); } // Notify all blocks in this chain that they are connected to an active streamer send_terminator->set_tx_streamer(true, 0); // Store a weak pointer to prevent a streamer->device3_impl->streamer circular dependency. // Note that we store the streamer only once, and use its terminator's // ID to do so. _tx_streamers[send_terminator->unique_id()] = boost::weak_ptr(my_streamer); // Sets tick rate, samp rate and scaling on this streamer // A registered terminator is required to do this. update_tx_streamers(); post_streamer_hooks(TX_DIRECTION); return my_streamer; }