diff options
Diffstat (limited to 'host/lib/usrp/n230/n230_stream_manager.cpp')
-rw-r--r-- | host/lib/usrp/n230/n230_stream_manager.cpp | 551 |
1 files changed, 0 insertions, 551 deletions
diff --git a/host/lib/usrp/n230/n230_stream_manager.cpp b/host/lib/usrp/n230/n230_stream_manager.cpp deleted file mode 100644 index 5c44992ca..000000000 --- a/host/lib/usrp/n230/n230_stream_manager.cpp +++ /dev/null @@ -1,551 +0,0 @@ -// -// Copyright 2013 Ettus Research LLC -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#include "n230_stream_manager.hpp" - -#include "../../transport/super_recv_packet_handler.hpp" -#include "../../transport/super_send_packet_handler.hpp" -#include <uhdlib/usrp/common/async_packet_handler.hpp> -#include <uhd/transport/bounded_buffer.hpp> -#include <uhd/utils/tasks.hpp> -#include <uhd/utils/log.hpp> -#include <boost/bind.hpp> -#include <boost/make_shared.hpp> - -static const double N230_RX_SW_BUFF_FULL_FACTOR = 0.90; //Buffer should ideally be 90% full. -static const size_t N230_RX_FC_REQUEST_FREQ = 32; //per flow-control window -static const size_t N230_TX_MAX_ASYNC_MESSAGES = 1000; -static const size_t N230_TX_MAX_SPP = 4092; -static const size_t N230_TX_FC_RESPONSE_FREQ = 10; //per flow-control window - -static const uint32_t N230_EVENT_CODE_FLOW_CTRL = 0; - -namespace uhd { namespace usrp { namespace n230 { - -using namespace uhd::transport; - -n230_stream_manager::~n230_stream_manager() -{ -} - -/*********************************************************************** - * Receive streamer - **********************************************************************/ -n230_stream_manager::n230_stream_manager( - const n230_device_args_t& dev_args, - boost::shared_ptr<n230_resource_manager> resource_mgr, - boost::weak_ptr<property_tree> prop_tree -) : - _dev_args(dev_args), - _resource_mgr(resource_mgr), - _tree(prop_tree) -{ - _async_md_queue.reset(new async_md_queue_t(N230_TX_MAX_ASYNC_MESSAGES)); -} - -/*********************************************************************** - * Receive streamer - **********************************************************************/ -rx_streamer::sptr n230_stream_manager::get_rx_stream(const uhd::stream_args_t &args_) -{ - boost::mutex::scoped_lock lock(_stream_setup_mutex); - - stream_args_t args = args_; - - //setup defaults for unspecified values - if (args.otw_format.empty()) args.otw_format = "sc16"; - args.channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels; - - boost::shared_ptr<sph::recv_packet_streamer> my_streamer; - for (size_t stream_i = 0; stream_i < args.channels.size(); stream_i++) - { - const size_t chan = args.channels[stream_i]; - radio_resource_t& perif = _resource_mgr->get_radio(chan); - - //setup transport hints (default to a large recv buff) - //TODO: Propagate the device_args class into streamer in the future - device_addr_t device_addr = args.args; - if (not device_addr.has_key("recv_buff_size")) { - device_addr["recv_buff_size"] = std::to_string(_dev_args.get_recv_buff_size()); - } - if (not device_addr.has_key("recv_frame_size")) { - device_addr["recv_frame_size"] = std::to_string(_dev_args.get_recv_frame_size()); - } - if (not device_addr.has_key("num_recv_frames")) { - device_addr["num_recv_frames"] = std::to_string(_dev_args.get_num_recv_frames()); - } - - transport::udp_zero_copy::buff_params buff_params_out; - sid_t sid; - zero_copy_if::sptr xport = _resource_mgr->create_transport( - RX_DATA, chan, device_addr, sid, buff_params_out); - - //calculate packet size - static const size_t hdr_size = 0 - + vrt::max_if_hdr_words32*sizeof(uint32_t) - //+ sizeof(vrt::if_packet_info_t().tlr) //no longer using trailer - - sizeof(vrt::if_packet_info_t().cid) //no class id ever used - - sizeof(vrt::if_packet_info_t().tsi) //no int time ever used - ; - const size_t bpp = xport->get_recv_frame_size() - hdr_size; - const size_t bpi = convert::get_bytes_per_item(args.otw_format); - size_t spp = unsigned(args.args.cast<double>("spp", bpp/bpi)); - spp = std::min<size_t>(N230_TX_MAX_SPP, spp); //FPGA FIFO maximum for framing at full rate - - //make the new streamer given the samples per packet - if (not my_streamer) my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp); - my_streamer->resize(args.channels.size()); - - //init some streamer stuff - my_streamer->set_vrt_unpacker(&n230_stream_manager::_cvita_hdr_unpack); - - //set the converter - uhd::convert::id_type id; - id.input_format = args.otw_format + "_item32_be"; - id.num_inputs = 1; - id.output_format = args.cpu_format; - id.num_outputs = 1; - my_streamer->set_converter(id); - - perif.framer->clear(); - perif.framer->set_nsamps_per_packet(spp); - perif.framer->set_sid(sid.reversed().get()); - perif.framer->setup(args); - perif.ddc->setup(args); - - //Give the streamer a functor to get the recv_buffer - //bind requires a zero_copy_if::sptr to add a streamer->xport lifetime dependency - my_streamer->set_xport_chan_get_buff( - stream_i, - boost::bind(&zero_copy_if::get_recv_buff, xport, _1), - true /*flush*/ - ); - - my_streamer->set_overflow_handler(stream_i, boost::bind( - &n230_stream_manager::_handle_overflow, this, chan - )); - - my_streamer->set_issue_stream_cmd(stream_i, boost::bind( - &rx_vita_core_3000::issue_stream_command, perif.framer, _1 - )); - - const size_t fc_window = _get_rx_flow_control_window( - xport->get_recv_frame_size(), buff_params_out.recv_buff_size); - const size_t fc_handle_window = std::max<size_t>(1, fc_window / N230_RX_FC_REQUEST_FREQ); - - perif.framer->configure_flow_control(fc_window); - - //Give the streamer a functor to send flow control messages - //handle_rx_flowctrl is static and has no lifetime issues - boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); - my_streamer->set_xport_handle_flowctrl( - stream_i, boost::bind(&n230_stream_manager::_handle_rx_flowctrl, sid.get(), xport, fc_cache, _1), - fc_handle_window, - true/*init*/ - ); - - //Store a weak pointer to prevent a streamer->manager->streamer circular dependency - _rx_streamers[chan] = my_streamer; //store weak pointer - _rx_stream_cached_args[chan] = args; - - //Sets tick and samp rates on all streamer - update_tick_rate(_get_tick_rate()); - - //TODO: Find a way to remove this dependency - property_tree::sptr prop_tree = _tree.lock(); - if (prop_tree) { - //TODO: Update this to support multiple motherboards - const fs_path mb_path = "/mboards/0"; - prop_tree->access<double>(mb_path / "rx_dsps" / std::to_string(chan) / "rate" / "value").update(); - } - } - update_stream_states(); - - return my_streamer; -} - -/*********************************************************************** - * Transmit streamer - **********************************************************************/ -tx_streamer::sptr n230_stream_manager::get_tx_stream(const uhd::stream_args_t &args_) -{ - boost::mutex::scoped_lock lock(_stream_setup_mutex); - - uhd::stream_args_t args = args_; - - //setup defaults for unspecified values - if (not args.otw_format.empty() and args.otw_format != "sc16") { - throw uhd::value_error("n230_impl::get_tx_stream only supports otw_format sc16"); - } - args.otw_format = "sc16"; - args.channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels; - - //shared async queue for all channels in streamer - boost::shared_ptr<async_md_queue_t> async_md(new async_md_queue_t(N230_TX_MAX_ASYNC_MESSAGES)); - - boost::shared_ptr<sph::send_packet_streamer> my_streamer; - for (size_t stream_i = 0; stream_i < args.channels.size(); stream_i++) - { - const size_t chan = args.channels[stream_i]; - radio_resource_t& perif = _resource_mgr->get_radio(chan); - - //setup transport hints (default to a large recv buff) - //TODO: Propagate the device_args class into streamer in the future - device_addr_t device_addr = args.args; - if (not device_addr.has_key("send_buff_size")) { - device_addr["send_buff_size"] = std::to_string(_dev_args.get_send_buff_size()); - } - if (not device_addr.has_key("send_frame_size")) { - device_addr["send_frame_size"] = std::to_string(_dev_args.get_send_frame_size()); - } - if (not device_addr.has_key("num_send_frames")) { - device_addr["num_send_frames"] = std::to_string(_dev_args.get_num_send_frames()); - } - - transport::udp_zero_copy::buff_params buff_params_out; - sid_t sid; - zero_copy_if::sptr xport = _resource_mgr->create_transport( - TX_DATA, chan, device_addr, sid, buff_params_out); - - //calculate packet size - static const size_t hdr_size = 0 - + vrt::num_vrl_words32*sizeof(uint32_t) - + vrt::max_if_hdr_words32*sizeof(uint32_t) - //+ sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer - - sizeof(vrt::if_packet_info_t().cid) //no class id ever used - - sizeof(vrt::if_packet_info_t().tsi) //no int time ever used - ; - const size_t bpp = xport->get_send_frame_size() - hdr_size; - const size_t bpi = convert::get_bytes_per_item(args.otw_format); - const size_t spp = unsigned(args.args.cast<double>("spp", bpp/bpi)); - - //make the new streamer given the samples per packet - if (not my_streamer) my_streamer = boost::make_shared<sph::send_packet_streamer>(spp); - my_streamer->resize(args.channels.size()); - my_streamer->set_vrt_packer(&n230_stream_manager::_cvita_hdr_pack); - - //set the converter - uhd::convert::id_type id; - id.input_format = args.cpu_format; - id.num_inputs = 1; - id.output_format = args.otw_format + "_item32_be"; - id.num_outputs = 1; - my_streamer->set_converter(id); - - perif.deframer->clear(); - perif.deframer->setup(args); - perif.duc->setup(args); - - //flow control setup - size_t fc_window = _get_tx_flow_control_window( - bpp, device_addr.cast<size_t>("send_buff_size", _dev_args.get_send_buff_size())); - //In packets - const size_t fc_handle_window = (fc_window / N230_TX_FC_RESPONSE_FREQ); - - perif.deframer->configure_flow_control(0/*cycs off*/, fc_handle_window); - boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t()); - fc_cache->stream_channel = stream_i; - fc_cache->device_channel = chan; - fc_cache->async_queue = async_md; - fc_cache->old_async_queue = _async_md_queue; - - tick_rate_retriever_t get_tick_rate_fn = boost::bind(&n230_stream_manager::_get_tick_rate, this); - task::sptr task = task::make( - boost::bind(&n230_stream_manager::_handle_tx_async_msgs, - fc_cache, xport, get_tick_rate_fn)); - - //Give the streamer a functor to get the send buffer - //get_tx_buff_with_flowctrl is static so bind has no lifetime issues - //xport.send (sptr) is required to add streamer->data-transport lifetime dependency - //task (sptr) is required to add a streamer->async-handler lifetime dependency - my_streamer->set_xport_chan_get_buff( - stream_i, - boost::bind(&n230_stream_manager::_get_tx_buff_with_flowctrl, task, fc_cache, xport, fc_window, _1) - ); - //Give the streamer a functor handled received async messages - my_streamer->set_async_receiver( - boost::bind(&async_md_queue_t::pop_with_timed_wait, async_md, _1, _2) - ); - my_streamer->set_xport_chan_sid(stream_i, true, sid.get()); - my_streamer->set_enable_trailer(false); //TODO not implemented trailer support yet - - //Store a weak pointer to prevent a streamer->manager->streamer circular dependency - _tx_streamers[chan] = boost::weak_ptr<sph::send_packet_streamer>(my_streamer); - _tx_stream_cached_args[chan] = args; - - //Sets tick and samp rates on all streamer - update_tick_rate(_get_tick_rate()); - - //TODO: Find a way to remove this dependency - property_tree::sptr prop_tree = _tree.lock(); - if (prop_tree) { - //TODO: Update this to support multiple motherboards - const fs_path mb_path = "/mboards/0"; - prop_tree->access<double>(mb_path / "tx_dsps" / std::to_string(chan) / "rate" / "value").update(); - } - } - update_stream_states(); - - return my_streamer; -} - -/*********************************************************************** - * Async Message Receiver - **********************************************************************/ -bool n230_stream_manager::recv_async_msg(async_metadata_t &async_metadata, double timeout) -{ - return _async_md_queue->pop_with_timed_wait(async_metadata, timeout); -} - -/*********************************************************************** - * Sample Rate Updaters - **********************************************************************/ -void n230_stream_manager::update_rx_samp_rate(const size_t dspno, const double rate) -{ - boost::shared_ptr<sph::recv_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[dspno].lock()); - if (not my_streamer) return; - my_streamer->set_samp_rate(rate); - const double adj = _resource_mgr->get_radio(dspno).ddc->get_scaling_adjustment(); - my_streamer->set_scale_factor(adj); -} - -void n230_stream_manager::update_tx_samp_rate(const size_t dspno, const double rate) -{ - boost::shared_ptr<sph::send_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[dspno].lock()); - if (not my_streamer) return; - my_streamer->set_samp_rate(rate); - const double adj = _resource_mgr->get_radio(dspno).duc->get_scaling_adjustment(); - my_streamer->set_scale_factor(adj); -} - -/*********************************************************************** - * Tick Rate Updater - **********************************************************************/ -void n230_stream_manager::update_tick_rate(const double rate) -{ - for (size_t i = 0; i < fpga::NUM_RADIOS; i++) { - radio_resource_t& perif = _resource_mgr->get_radio(i); - - boost::shared_ptr<sph::recv_packet_streamer> my_rx_streamer = - boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[i].lock()); - if (my_rx_streamer) my_rx_streamer->set_tick_rate(rate); - perif.framer->set_tick_rate(rate); - - boost::shared_ptr<sph::send_packet_streamer> my_tx_streamer = - boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[i].lock()); - if (my_tx_streamer) my_tx_streamer->set_tick_rate(rate); - } -} - -/*********************************************************************** - * Stream State Updater - **********************************************************************/ -void n230_stream_manager::update_stream_states() -{ - //extract settings from state variables - const bool enb_tx0 = bool(_tx_streamers[0].lock()); - const bool enb_rx0 = bool(_rx_streamers[0].lock()); - const bool enb_tx1 = bool(_tx_streamers[1].lock()); - const bool enb_rx1 = bool(_rx_streamers[1].lock()); - - fe_state_t fe0_state = NONE_STREAMING; - if (enb_tx0 && enb_rx0) fe0_state = TXRX_STREAMING; - else if (enb_tx0) fe0_state = TX_STREAMING; - else if (enb_rx0) fe0_state = RX_STREAMING; - - fe_state_t fe1_state = NONE_STREAMING; - if (enb_tx1 && enb_rx1) fe1_state = TXRX_STREAMING; - else if (enb_tx1) fe1_state = TX_STREAMING; - else if (enb_rx1) fe1_state = RX_STREAMING; - - _resource_mgr->get_frontend_ctrl().set_stream_state(fe0_state, fe1_state); -} - -size_t n230_stream_manager::_get_rx_flow_control_window(size_t frame_size, size_t sw_buff_size) -{ - double sw_buff_max = sw_buff_size * N230_RX_SW_BUFF_FULL_FACTOR; - size_t window_in_pkts = (static_cast<size_t>(sw_buff_max) / frame_size); - if (window_in_pkts == 0) { - throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size."); - } - return window_in_pkts; -} - -void n230_stream_manager::_handle_overflow(const size_t i) -{ - boost::shared_ptr<sph::recv_packet_streamer> my_streamer = - boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[i].lock()); - if (my_streamer->get_num_channels() == 2) { - //MIMO - //find out if we were in continuous mode before stopping - const bool in_continuous_streaming_mode = _resource_mgr->get_radio(i).framer->in_continuous_streaming_mode(); - //stop streaming - my_streamer->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); - //restart streaming - if (in_continuous_streaming_mode) { - stream_cmd_t stream_cmd(stream_cmd_t::STREAM_MODE_START_CONTINUOUS); - stream_cmd.stream_now = false; - stream_cmd.time_spec = _resource_mgr->get_radio(i).time->get_time_now() + time_spec_t(0.01); - my_streamer->issue_stream_cmd(stream_cmd); - } - } else { - _resource_mgr->get_radio(i).framer->handle_overflow(); - } -} - -void n230_stream_manager::_handle_rx_flowctrl( - const sid_t& sid, - zero_copy_if::sptr xport, - boost::shared_ptr<rx_fc_cache_t> fc_cache, - const size_t last_seq) -{ - static const size_t RXFC_PACKET_LEN_IN_WORDS = 2; - static const size_t RXFC_CMD_CODE_OFFSET = 0; - static const size_t RXFC_SEQ_NUM_OFFSET = 1; - - managed_send_buffer::sptr buff = xport->get_send_buff(0.0); - if (not buff) { - throw uhd::runtime_error("handle_rx_flowctrl timed out getting a send buffer"); - } - uint32_t *pkt = buff->cast<uint32_t *>(); - - //recover seq32 - size_t& seq_sw = fc_cache->last_seq_in; - const size_t seq_hw = seq_sw & HW_SEQ_NUM_MASK; - if (last_seq < seq_hw) seq_sw += (HW_SEQ_NUM_MASK + 1); - seq_sw &= ~HW_SEQ_NUM_MASK; - seq_sw |= last_seq; - - //load packet info - vrt::if_packet_info_t packet_info; - packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_CONTEXT; - packet_info.num_payload_words32 = RXFC_PACKET_LEN_IN_WORDS; - packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t); - packet_info.packet_count = seq_sw; - packet_info.sob = false; - packet_info.eob = false; - packet_info.sid = sid.get(); - packet_info.has_sid = true; - packet_info.has_cid = false; - packet_info.has_tsi = false; - packet_info.has_tsf = false; - packet_info.has_tlr = false; - - //load header - _cvita_hdr_pack(pkt, packet_info); - - //load payload - pkt[packet_info.num_header_words32 + RXFC_CMD_CODE_OFFSET] = uhd::htonx<uint32_t>(N230_EVENT_CODE_FLOW_CTRL); - pkt[packet_info.num_header_words32 + RXFC_SEQ_NUM_OFFSET] = uhd::htonx<uint32_t>(seq_sw); - - //send the buffer over the interface - buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32)); -} - -void n230_stream_manager::_handle_tx_async_msgs( - boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr xport, - tick_rate_retriever_t get_tick_rate) -{ - managed_recv_buffer::sptr buff = xport->get_recv_buff(); - if (not buff) return; - - //extract packet info - vrt::if_packet_info_t if_packet_info; - if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); - const uint32_t *packet_buff = buff->cast<const uint32_t *>(); - - //unpacking can fail - uint32_t (*endian_conv)(uint32_t) = uhd::ntohx; - try { - _cvita_hdr_unpack(packet_buff, if_packet_info); - endian_conv = uhd::ntohx; - } catch(const std::exception &ex) { - UHD_LOGGER_ERROR("N230") << "Error parsing async message packet: " << ex.what() ; - return; - } - - //fill in the async metadata - async_metadata_t metadata; - load_metadata_from_buff( - endian_conv, metadata, if_packet_info, packet_buff, - get_tick_rate(), fc_cache->stream_channel); - - //The FC response and the burst ack are two indicators that the radio - //consumed packets. Use them to update the FC metadata - if (metadata.event_code == N230_EVENT_CODE_FLOW_CTRL or - metadata.event_code == async_metadata_t::EVENT_CODE_BURST_ACK - ) { - const size_t seq = metadata.user_payload[0]; - fc_cache->seq_queue.push_with_pop_on_full(seq); - } - - //FC responses don't propagate up to the user so filter them here - if (metadata.event_code != N230_EVENT_CODE_FLOW_CTRL) { - fc_cache->async_queue->push_with_pop_on_full(metadata); - metadata.channel = fc_cache->device_channel; - fc_cache->old_async_queue->push_with_pop_on_full(metadata); - standard_async_msg_prints(metadata); - } -} - -managed_send_buffer::sptr n230_stream_manager::_get_tx_buff_with_flowctrl( - task::sptr /*holds ref*/, - boost::shared_ptr<tx_fc_cache_t> fc_cache, - zero_copy_if::sptr xport, - size_t fc_pkt_window, - const double timeout) -{ - while (true) - { - const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - (fc_cache->last_seq_ack & HW_SEQ_NUM_MASK); - if ((delta & HW_SEQ_NUM_MASK) <= fc_pkt_window) break; - - const bool ok = fc_cache->seq_queue.pop_with_timed_wait(fc_cache->last_seq_ack, timeout); - if (not ok) return managed_send_buffer::sptr(); //timeout waiting for flow control - } - - managed_send_buffer::sptr buff = xport->get_send_buff(timeout); - if (buff) fc_cache->last_seq_out++; //update seq, this will actually be a send - return buff; -} - -size_t n230_stream_manager::_get_tx_flow_control_window( - size_t payload_size, - size_t hw_buff_size) -{ - size_t window_in_pkts = hw_buff_size / payload_size; - if (window_in_pkts == 0) { - throw uhd::value_error("send_buff_size must be larger than the send_frame_size."); - } - return window_in_pkts; -} - -double n230_stream_manager::_get_tick_rate() -{ - return _resource_mgr->get_clk_pps_ctrl().get_tick_rate(); -} - -void n230_stream_manager::_cvita_hdr_unpack( - const uint32_t *packet_buff, - vrt::if_packet_info_t &if_packet_info) -{ - if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR; - return vrt::if_hdr_unpack_be(packet_buff, if_packet_info); -} - -void n230_stream_manager::_cvita_hdr_pack( - uint32_t *packet_buff, - vrt::if_packet_info_t &if_packet_info) -{ - if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR; - return vrt::if_hdr_pack_be(packet_buff, if_packet_info); -} - -}}} //namespace |