aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp/n230/n230_stream_manager.cpp
diff options
context:
space:
mode:
authorAshish Chaudhari <ashish@ettus.com>2016-01-05 09:52:22 -0800
committerAshish Chaudhari <ashish@ettus.com>2016-01-05 17:35:14 -0800
commit863ca545d629e0e40a21e085572440e764de882d (patch)
treeb33a602f6308c4c0838131239ab5a600a3546355 /host/lib/usrp/n230/n230_stream_manager.cpp
parentb3077692a3f49e517510b53e99129dd4f24209c7 (diff)
downloaduhd-863ca545d629e0e40a21e085572440e764de882d.tar.gz
uhd-863ca545d629e0e40a21e085572440e764de882d.tar.bz2
uhd-863ca545d629e0e40a21e085572440e764de882d.zip
n230: Added N230 device support
Diffstat (limited to 'host/lib/usrp/n230/n230_stream_manager.cpp')
-rw-r--r--host/lib/usrp/n230/n230_stream_manager.cpp562
1 files changed, 562 insertions, 0 deletions
diff --git a/host/lib/usrp/n230/n230_stream_manager.cpp b/host/lib/usrp/n230/n230_stream_manager.cpp
new file mode 100644
index 000000000..e7624ecd6
--- /dev/null
+++ b/host/lib/usrp/n230/n230_stream_manager.cpp
@@ -0,0 +1,562 @@
+//
+// Copyright 2013 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "n230_stream_manager.hpp"
+
+#include "../../transport/super_recv_packet_handler.hpp"
+#include "../../transport/super_send_packet_handler.hpp"
+#include "async_packet_handler.hpp"
+#include <uhd/transport/bounded_buffer.hpp>
+#include <boost/bind.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/foreach.hpp>
+#include <boost/make_shared.hpp>
+
+static const double N230_RX_SW_BUFF_FULL_FACTOR = 0.90; //Buffer should ideally be 90% full.
+static const size_t N230_RX_FC_REQUEST_FREQ = 32; //per flow-control window
+static const size_t N230_TX_MAX_ASYNC_MESSAGES = 1000;
+static const size_t N230_TX_MAX_SPP = 4092;
+static const size_t N230_TX_FC_RESPONSE_FREQ = 10; //per flow-control window
+
+static const boost::uint32_t N230_EVENT_CODE_FLOW_CTRL = 0;
+
+namespace uhd { namespace usrp { namespace n230 {
+
+using namespace uhd::transport;
+
+n230_stream_manager::~n230_stream_manager()
+{
+}
+
+/***********************************************************************
+ * Receive streamer
+ **********************************************************************/
+n230_stream_manager::n230_stream_manager(
+ const n230_device_args_t& dev_args,
+ boost::shared_ptr<n230_resource_manager> resource_mgr,
+ boost::weak_ptr<property_tree> prop_tree
+) :
+ _dev_args(dev_args),
+ _resource_mgr(resource_mgr),
+ _tree(prop_tree)
+{
+ _async_md_queue.reset(new async_md_queue_t(N230_TX_MAX_ASYNC_MESSAGES));
+}
+
+/***********************************************************************
+ * Receive streamer
+ **********************************************************************/
+rx_streamer::sptr n230_stream_manager::get_rx_stream(const uhd::stream_args_t &args_)
+{
+ boost::mutex::scoped_lock lock(_stream_setup_mutex);
+
+ stream_args_t args = args_;
+
+ //setup defaults for unspecified values
+ if (args.otw_format.empty()) args.otw_format = "sc16";
+ args.channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels;
+
+ boost::shared_ptr<sph::recv_packet_streamer> my_streamer;
+ for (size_t stream_i = 0; stream_i < args.channels.size(); stream_i++)
+ {
+ const size_t chan = args.channels[stream_i];
+ radio_resource_t& perif = _resource_mgr->get_radio(chan);
+
+ //setup transport hints (default to a large recv buff)
+ //TODO: Propagate the device_args class into streamer in the future
+ device_addr_t device_addr = args.args;
+ if (not device_addr.has_key("recv_buff_size")) {
+ device_addr["recv_buff_size"] = boost::lexical_cast<std::string>(_dev_args.get_recv_buff_size());
+ }
+ if (not device_addr.has_key("recv_frame_size")) {
+ device_addr["recv_frame_size"] = boost::lexical_cast<std::string>(_dev_args.get_recv_frame_size());
+ }
+ if (not device_addr.has_key("num_recv_frames")) {
+ device_addr["num_recv_frames"] = boost::lexical_cast<std::string>(_dev_args.get_num_recv_frames());
+ }
+
+ transport::udp_zero_copy::buff_params buff_params_out;
+ sid_t sid;
+ zero_copy_if::sptr xport = _resource_mgr->create_transport(
+ RX_DATA, chan, device_addr, sid, buff_params_out);
+
+ //calculate packet size
+ static const size_t hdr_size = 0
+ + vrt::max_if_hdr_words32*sizeof(boost::uint32_t)
+ //+ sizeof(vrt::if_packet_info_t().tlr) //no longer using trailer
+ - sizeof(vrt::if_packet_info_t().cid) //no class id ever used
+ - sizeof(vrt::if_packet_info_t().tsi) //no int time ever used
+ ;
+ const size_t bpp = xport->get_recv_frame_size() - hdr_size;
+ const size_t bpi = convert::get_bytes_per_item(args.otw_format);
+ size_t spp = unsigned(args.args.cast<double>("spp", bpp/bpi));
+ spp = std::min<size_t>(N230_TX_MAX_SPP, spp); //FPGA FIFO maximum for framing at full rate
+
+ //make the new streamer given the samples per packet
+ if (not my_streamer) my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp);
+ my_streamer->resize(args.channels.size());
+
+ //init some streamer stuff
+ my_streamer->set_vrt_unpacker(&n230_stream_manager::_cvita_hdr_unpack);
+
+ //set the converter
+ uhd::convert::id_type id;
+ id.input_format = args.otw_format + "_item32_be";
+ id.num_inputs = 1;
+ id.output_format = args.cpu_format;
+ id.num_outputs = 1;
+ my_streamer->set_converter(id);
+
+ perif.framer->clear();
+ perif.framer->set_nsamps_per_packet(spp);
+ perif.framer->set_sid(sid.reversed().get());
+ perif.framer->setup(args);
+ perif.ddc->setup(args);
+
+ //Give the streamer a functor to get the recv_buffer
+ //bind requires a zero_copy_if::sptr to add a streamer->xport lifetime dependency
+ my_streamer->set_xport_chan_get_buff(
+ stream_i,
+ boost::bind(&zero_copy_if::get_recv_buff, xport, _1),
+ true /*flush*/
+ );
+
+ my_streamer->set_overflow_handler(stream_i, boost::bind(
+ &n230_stream_manager::_handle_overflow, this, chan
+ ));
+
+ my_streamer->set_issue_stream_cmd(stream_i, boost::bind(
+ &rx_vita_core_3000::issue_stream_command, perif.framer, _1
+ ));
+
+ const size_t fc_window = _get_rx_flow_control_window(
+ xport->get_recv_frame_size(), buff_params_out.recv_buff_size);
+ const size_t fc_handle_window = std::max<size_t>(1, fc_window / N230_RX_FC_REQUEST_FREQ);
+
+ perif.framer->configure_flow_control(fc_window);
+
+ //Give the streamer a functor to send flow control messages
+ //handle_rx_flowctrl is static and has no lifetime issues
+ boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t());
+ my_streamer->set_xport_handle_flowctrl(
+ stream_i, boost::bind(&n230_stream_manager::_handle_rx_flowctrl, sid.get(), xport, fc_cache, _1),
+ fc_handle_window,
+ true/*init*/
+ );
+
+ //Store a weak pointer to prevent a streamer->manager->streamer circular dependency
+ _rx_streamers[chan] = my_streamer; //store weak pointer
+ _rx_stream_cached_args[chan] = args;
+
+ //Sets tick and samp rates on all streamer
+ update_tick_rate(_get_tick_rate());
+
+ //TODO: Find a way to remove this dependency
+ property_tree::sptr prop_tree = _tree.lock();
+ if (prop_tree) {
+ //TODO: Update this to support multiple motherboards
+ const fs_path mb_path = "/mboards/0";
+ prop_tree->access<double>(mb_path / "rx_dsps" / boost::lexical_cast<std::string>(chan) / "rate" / "value").update();
+ }
+ }
+ update_stream_states();
+
+ return my_streamer;
+}
+
+/***********************************************************************
+ * Transmit streamer
+ **********************************************************************/
+tx_streamer::sptr n230_stream_manager::get_tx_stream(const uhd::stream_args_t &args_)
+{
+ boost::mutex::scoped_lock lock(_stream_setup_mutex);
+
+ uhd::stream_args_t args = args_;
+
+ //setup defaults for unspecified values
+ if (not args.otw_format.empty() and args.otw_format != "sc16") {
+ throw uhd::value_error("n230_impl::get_tx_stream only supports otw_format sc16");
+ }
+ args.otw_format = "sc16";
+ args.channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels;
+
+ //shared async queue for all channels in streamer
+ boost::shared_ptr<async_md_queue_t> async_md(new async_md_queue_t(N230_TX_MAX_ASYNC_MESSAGES));
+
+ boost::shared_ptr<sph::send_packet_streamer> my_streamer;
+ for (size_t stream_i = 0; stream_i < args.channels.size(); stream_i++)
+ {
+ const size_t chan = args.channels[stream_i];
+ radio_resource_t& perif = _resource_mgr->get_radio(chan);
+
+ //setup transport hints (default to a large recv buff)
+ //TODO: Propagate the device_args class into streamer in the future
+ device_addr_t device_addr = args.args;
+ if (not device_addr.has_key("send_buff_size")) {
+ device_addr["send_buff_size"] = boost::lexical_cast<std::string>(_dev_args.get_send_buff_size());
+ }
+ if (not device_addr.has_key("send_frame_size")) {
+ device_addr["send_frame_size"] = boost::lexical_cast<std::string>(_dev_args.get_send_frame_size());
+ }
+ if (not device_addr.has_key("num_send_frames")) {
+ device_addr["num_send_frames"] = boost::lexical_cast<std::string>(_dev_args.get_num_send_frames());
+ }
+
+ transport::udp_zero_copy::buff_params buff_params_out;
+ sid_t sid;
+ zero_copy_if::sptr xport = _resource_mgr->create_transport(
+ TX_DATA, chan, device_addr, sid, buff_params_out);
+
+ //calculate packet size
+ static const size_t hdr_size = 0
+ + vrt::num_vrl_words32*sizeof(boost::uint32_t)
+ + vrt::max_if_hdr_words32*sizeof(boost::uint32_t)
+ //+ sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer
+ - sizeof(vrt::if_packet_info_t().cid) //no class id ever used
+ - sizeof(vrt::if_packet_info_t().tsi) //no int time ever used
+ ;
+ const size_t bpp = xport->get_send_frame_size() - hdr_size;
+ const size_t bpi = convert::get_bytes_per_item(args.otw_format);
+ const size_t spp = unsigned(args.args.cast<double>("spp", bpp/bpi));
+
+ //make the new streamer given the samples per packet
+ if (not my_streamer) my_streamer = boost::make_shared<sph::send_packet_streamer>(spp);
+ my_streamer->resize(args.channels.size());
+ my_streamer->set_vrt_packer(&n230_stream_manager::_cvita_hdr_pack);
+
+ //set the converter
+ uhd::convert::id_type id;
+ id.input_format = args.cpu_format;
+ id.num_inputs = 1;
+ id.output_format = args.otw_format + "_item32_be";
+ id.num_outputs = 1;
+ my_streamer->set_converter(id);
+
+ perif.deframer->clear();
+ perif.deframer->setup(args);
+ perif.duc->setup(args);
+
+ //flow control setup
+ size_t fc_window = _get_tx_flow_control_window(
+ bpp, device_addr.cast<size_t>("send_buff_size", _dev_args.get_send_buff_size()));
+ //In packets
+ const size_t fc_handle_window = (fc_window / N230_TX_FC_RESPONSE_FREQ);
+
+ perif.deframer->configure_flow_control(0/*cycs off*/, fc_handle_window);
+ boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t());
+ fc_cache->stream_channel = stream_i;
+ fc_cache->device_channel = chan;
+ fc_cache->async_queue = async_md;
+ fc_cache->old_async_queue = _async_md_queue;
+
+ tick_rate_retriever_t get_tick_rate_fn = boost::bind(&n230_stream_manager::_get_tick_rate, this);
+ task::sptr task = task::make(
+ boost::bind(&n230_stream_manager::_handle_tx_async_msgs,
+ fc_cache, xport, get_tick_rate_fn));
+
+ //Give the streamer a functor to get the send buffer
+ //get_tx_buff_with_flowctrl is static so bind has no lifetime issues
+ //xport.send (sptr) is required to add streamer->data-transport lifetime dependency
+ //task (sptr) is required to add a streamer->async-handler lifetime dependency
+ my_streamer->set_xport_chan_get_buff(
+ stream_i,
+ boost::bind(&n230_stream_manager::_get_tx_buff_with_flowctrl, task, fc_cache, xport, fc_window, _1)
+ );
+ //Give the streamer a functor handled received async messages
+ my_streamer->set_async_receiver(
+ boost::bind(&async_md_queue_t::pop_with_timed_wait, async_md, _1, _2)
+ );
+ my_streamer->set_xport_chan_sid(stream_i, true, sid.get());
+ my_streamer->set_enable_trailer(false); //TODO not implemented trailer support yet
+
+ //Store a weak pointer to prevent a streamer->manager->streamer circular dependency
+ _tx_streamers[chan] = boost::weak_ptr<sph::send_packet_streamer>(my_streamer);
+ _tx_stream_cached_args[chan] = args;
+
+ //Sets tick and samp rates on all streamer
+ update_tick_rate(_get_tick_rate());
+
+ //TODO: Find a way to remove this dependency
+ property_tree::sptr prop_tree = _tree.lock();
+ if (prop_tree) {
+ //TODO: Update this to support multiple motherboards
+ const fs_path mb_path = "/mboards/0";
+ prop_tree->access<double>(mb_path / "tx_dsps" / boost::lexical_cast<std::string>(chan) / "rate" / "value").update();
+ }
+ }
+ update_stream_states();
+
+ return my_streamer;
+}
+
+/***********************************************************************
+ * Async Message Receiver
+ **********************************************************************/
+bool n230_stream_manager::recv_async_msg(async_metadata_t &async_metadata, double timeout)
+{
+ return _async_md_queue->pop_with_timed_wait(async_metadata, timeout);
+}
+
+/***********************************************************************
+ * Sample Rate Updaters
+ **********************************************************************/
+void n230_stream_manager::update_rx_samp_rate(const size_t dspno, const double rate)
+{
+ boost::shared_ptr<sph::recv_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[dspno].lock());
+ if (not my_streamer) return;
+ my_streamer->set_samp_rate(rate);
+ const double adj = _resource_mgr->get_radio(dspno).ddc->get_scaling_adjustment();
+ my_streamer->set_scale_factor(adj);
+}
+
+void n230_stream_manager::update_tx_samp_rate(const size_t dspno, const double rate)
+{
+ boost::shared_ptr<sph::send_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[dspno].lock());
+ if (not my_streamer) return;
+ my_streamer->set_samp_rate(rate);
+ const double adj = _resource_mgr->get_radio(dspno).duc->get_scaling_adjustment();
+ my_streamer->set_scale_factor(adj);
+}
+
+/***********************************************************************
+ * Tick Rate Updater
+ **********************************************************************/
+void n230_stream_manager::update_tick_rate(const double rate)
+{
+ for (size_t i = 0; i < fpga::NUM_RADIOS; i++) {
+ radio_resource_t& perif = _resource_mgr->get_radio(i);
+
+ boost::shared_ptr<sph::recv_packet_streamer> my_rx_streamer =
+ boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[i].lock());
+ if (my_rx_streamer) my_rx_streamer->set_tick_rate(rate);
+ perif.framer->set_tick_rate(rate);
+
+ boost::shared_ptr<sph::send_packet_streamer> my_tx_streamer =
+ boost::dynamic_pointer_cast<sph::send_packet_streamer>(_tx_streamers[i].lock());
+ if (my_tx_streamer) my_tx_streamer->set_tick_rate(rate);
+ }
+}
+
+/***********************************************************************
+ * Stream State Updater
+ **********************************************************************/
+void n230_stream_manager::update_stream_states()
+{
+ //extract settings from state variables
+ const bool enb_tx0 = bool(_tx_streamers[0].lock());
+ const bool enb_rx0 = bool(_rx_streamers[0].lock());
+ const bool enb_tx1 = bool(_tx_streamers[1].lock());
+ const bool enb_rx1 = bool(_rx_streamers[1].lock());
+
+ fe_state_t fe0_state = NONE_STREAMING;
+ if (enb_tx0 && enb_rx0) fe0_state = TXRX_STREAMING;
+ else if (enb_tx0) fe0_state = TX_STREAMING;
+ else if (enb_rx0) fe0_state = RX_STREAMING;
+
+ fe_state_t fe1_state = NONE_STREAMING;
+ if (enb_tx1 && enb_rx1) fe1_state = TXRX_STREAMING;
+ else if (enb_tx1) fe1_state = TX_STREAMING;
+ else if (enb_rx1) fe1_state = RX_STREAMING;
+
+ _resource_mgr->get_frontend_ctrl().set_stream_state(fe0_state, fe1_state);
+}
+
+size_t n230_stream_manager::_get_rx_flow_control_window(size_t frame_size, size_t sw_buff_size)
+{
+ double sw_buff_max = sw_buff_size * N230_RX_SW_BUFF_FULL_FACTOR;
+ size_t window_in_pkts = (static_cast<size_t>(sw_buff_max) / frame_size);
+ if (window_in_pkts == 0) {
+ throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size.");
+ }
+ return window_in_pkts;
+}
+
+void n230_stream_manager::_handle_overflow(const size_t i)
+{
+ boost::shared_ptr<sph::recv_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_rx_streamers[i].lock());
+ if (my_streamer->get_num_channels() == 2) {
+ //MIMO
+ //find out if we were in continuous mode before stopping
+ const bool in_continuous_streaming_mode = _resource_mgr->get_radio(i).framer->in_continuous_streaming_mode();
+ //stop streaming
+ my_streamer->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ //restart streaming
+ if (in_continuous_streaming_mode) {
+ stream_cmd_t stream_cmd(stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
+ stream_cmd.stream_now = false;
+ stream_cmd.time_spec = _resource_mgr->get_radio(i).time->get_time_now() + time_spec_t(0.01);
+ my_streamer->issue_stream_cmd(stream_cmd);
+ }
+ } else {
+ _resource_mgr->get_radio(i).framer->handle_overflow();
+ }
+}
+
+void n230_stream_manager::_handle_rx_flowctrl(
+ const sid_t& sid,
+ zero_copy_if::sptr xport,
+ boost::shared_ptr<rx_fc_cache_t> fc_cache,
+ const size_t last_seq)
+{
+ static const size_t RXFC_PACKET_LEN_IN_WORDS = 2;
+ static const size_t RXFC_CMD_CODE_OFFSET = 0;
+ static const size_t RXFC_SEQ_NUM_OFFSET = 1;
+
+ managed_send_buffer::sptr buff = xport->get_send_buff(0.0);
+ if (not buff) {
+ throw uhd::runtime_error("handle_rx_flowctrl timed out getting a send buffer");
+ }
+ boost::uint32_t *pkt = buff->cast<boost::uint32_t *>();
+
+ //recover seq32
+ size_t& seq_sw = fc_cache->last_seq_in;
+ const size_t seq_hw = seq_sw & HW_SEQ_NUM_MASK;
+ if (last_seq < seq_hw) seq_sw += (HW_SEQ_NUM_MASK + 1);
+ seq_sw &= ~HW_SEQ_NUM_MASK;
+ seq_sw |= last_seq;
+
+ //load packet info
+ vrt::if_packet_info_t packet_info;
+ packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_CONTEXT;
+ packet_info.num_payload_words32 = RXFC_PACKET_LEN_IN_WORDS;
+ packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(boost::uint32_t);
+ packet_info.packet_count = seq_sw;
+ packet_info.sob = false;
+ packet_info.eob = false;
+ packet_info.sid = sid.get();
+ packet_info.has_sid = true;
+ packet_info.has_cid = false;
+ packet_info.has_tsi = false;
+ packet_info.has_tsf = false;
+ packet_info.has_tlr = false;
+
+ //load header
+ _cvita_hdr_pack(pkt, packet_info);
+
+ //load payload
+ pkt[packet_info.num_header_words32 + RXFC_CMD_CODE_OFFSET] = uhd::htonx<boost::uint32_t>(N230_EVENT_CODE_FLOW_CTRL);
+ pkt[packet_info.num_header_words32 + RXFC_SEQ_NUM_OFFSET] = uhd::htonx<boost::uint32_t>(seq_sw);
+
+ //send the buffer over the interface
+ buff->commit(sizeof(boost::uint32_t)*(packet_info.num_packet_words32));
+}
+
+void n230_stream_manager::_handle_tx_async_msgs(
+ boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ zero_copy_if::sptr xport,
+ tick_rate_retriever_t get_tick_rate)
+{
+ managed_recv_buffer::sptr buff = xport->get_recv_buff();
+ if (not buff) return;
+
+ //extract packet info
+ vrt::if_packet_info_t if_packet_info;
+ if_packet_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
+ const boost::uint32_t *packet_buff = buff->cast<const boost::uint32_t *>();
+
+ //unpacking can fail
+ uint32_t (*endian_conv)(uint32_t) = uhd::ntohx;
+ try {
+ _cvita_hdr_unpack(packet_buff, if_packet_info);
+ endian_conv = uhd::ntohx;
+ } catch(const std::exception &ex) {
+ UHD_MSG(error) << "Error parsing async message packet: " << ex.what() << std::endl;
+ return;
+ }
+
+ //fill in the async metadata
+ async_metadata_t metadata;
+ load_metadata_from_buff(
+ endian_conv, metadata, if_packet_info, packet_buff,
+ get_tick_rate(), fc_cache->stream_channel);
+
+ //The FC response and the burst ack are two indicators that the radio
+ //consumed packets. Use them to update the FC metadata
+ if (metadata.event_code == N230_EVENT_CODE_FLOW_CTRL or
+ metadata.event_code == async_metadata_t::EVENT_CODE_BURST_ACK
+ ) {
+ const size_t seq = metadata.user_payload[0];
+ fc_cache->seq_queue.push_with_pop_on_full(seq);
+ }
+
+ //FC responses don't propagate up to the user so filter them here
+ if (metadata.event_code != N230_EVENT_CODE_FLOW_CTRL) {
+ fc_cache->async_queue->push_with_pop_on_full(metadata);
+ metadata.channel = fc_cache->device_channel;
+ fc_cache->old_async_queue->push_with_pop_on_full(metadata);
+ standard_async_msg_prints(metadata);
+ }
+}
+
+managed_send_buffer::sptr n230_stream_manager::_get_tx_buff_with_flowctrl(
+ task::sptr /*holds ref*/,
+ boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ zero_copy_if::sptr xport,
+ size_t fc_pkt_window,
+ const double timeout)
+{
+ while (true)
+ {
+ const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - (fc_cache->last_seq_ack & HW_SEQ_NUM_MASK);
+ if ((delta & HW_SEQ_NUM_MASK) <= fc_pkt_window) break;
+
+ const bool ok = fc_cache->seq_queue.pop_with_timed_wait(fc_cache->last_seq_ack, timeout);
+ if (not ok) return managed_send_buffer::sptr(); //timeout waiting for flow control
+ }
+
+ managed_send_buffer::sptr buff = xport->get_send_buff(timeout);
+ if (buff) fc_cache->last_seq_out++; //update seq, this will actually be a send
+ return buff;
+}
+
+size_t n230_stream_manager::_get_tx_flow_control_window(
+ size_t payload_size,
+ size_t hw_buff_size)
+{
+ size_t window_in_pkts = hw_buff_size / payload_size;
+ if (window_in_pkts == 0) {
+ throw uhd::value_error("send_buff_size must be larger than the send_frame_size.");
+ }
+ return window_in_pkts;
+}
+
+double n230_stream_manager::_get_tick_rate()
+{
+ return _resource_mgr->get_clk_pps_ctrl().get_tick_rate();
+}
+
+void n230_stream_manager::_cvita_hdr_unpack(
+ const boost::uint32_t *packet_buff,
+ vrt::if_packet_info_t &if_packet_info)
+{
+ if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR;
+ return vrt::if_hdr_unpack_be(packet_buff, if_packet_info);
+}
+
+void n230_stream_manager::_cvita_hdr_pack(
+ boost::uint32_t *packet_buff,
+ vrt::if_packet_info_t &if_packet_info)
+{
+ if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR;
+ return vrt::if_hdr_pack_be(packet_buff, if_packet_info);
+}
+
+}}} //namespace