//
// Copyright 2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//
#include "n230_stream_manager.hpp"
#include "../../transport/super_recv_packet_handler.hpp"
#include "../../transport/super_send_packet_handler.hpp"
#include "async_packet_handler.hpp"
#include
#include
#include
#include
#include
static const double N230_RX_SW_BUFF_FULL_FACTOR = 0.90; //Buffer should ideally be 90% full.
static const size_t N230_RX_FC_REQUEST_FREQ = 32; //per flow-control window
static const size_t N230_TX_MAX_ASYNC_MESSAGES = 1000;
static const size_t N230_TX_MAX_SPP = 4092;
static const size_t N230_TX_FC_RESPONSE_FREQ = 10; //per flow-control window
static const uint32_t N230_EVENT_CODE_FLOW_CTRL = 0;
namespace uhd { namespace usrp { namespace n230 {
using namespace uhd::transport;
n230_stream_manager::~n230_stream_manager()
{
}
/***********************************************************************
* Receive streamer
**********************************************************************/
n230_stream_manager::n230_stream_manager(
const n230_device_args_t& dev_args,
boost::shared_ptr resource_mgr,
boost::weak_ptr prop_tree
) :
_dev_args(dev_args),
_resource_mgr(resource_mgr),
_tree(prop_tree)
{
_async_md_queue.reset(new async_md_queue_t(N230_TX_MAX_ASYNC_MESSAGES));
}
/***********************************************************************
* Receive streamer
**********************************************************************/
rx_streamer::sptr n230_stream_manager::get_rx_stream(const uhd::stream_args_t &args_)
{
boost::mutex::scoped_lock lock(_stream_setup_mutex);
stream_args_t args = args_;
//setup defaults for unspecified values
if (args.otw_format.empty()) args.otw_format = "sc16";
args.channels = args.channels.empty()? std::vector(1, 0) : args.channels;
boost::shared_ptr my_streamer;
for (size_t stream_i = 0; stream_i < args.channels.size(); stream_i++)
{
const size_t chan = args.channels[stream_i];
radio_resource_t& perif = _resource_mgr->get_radio(chan);
//setup transport hints (default to a large recv buff)
//TODO: Propagate the device_args class into streamer in the future
device_addr_t device_addr = args.args;
if (not device_addr.has_key("recv_buff_size")) {
device_addr["recv_buff_size"] = boost::lexical_cast(_dev_args.get_recv_buff_size());
}
if (not device_addr.has_key("recv_frame_size")) {
device_addr["recv_frame_size"] = boost::lexical_cast(_dev_args.get_recv_frame_size());
}
if (not device_addr.has_key("num_recv_frames")) {
device_addr["num_recv_frames"] = boost::lexical_cast(_dev_args.get_num_recv_frames());
}
transport::udp_zero_copy::buff_params buff_params_out;
sid_t sid;
zero_copy_if::sptr xport = _resource_mgr->create_transport(
RX_DATA, chan, device_addr, sid, buff_params_out);
//calculate packet size
static const size_t hdr_size = 0
+ vrt::max_if_hdr_words32*sizeof(uint32_t)
//+ sizeof(vrt::if_packet_info_t().tlr) //no longer using trailer
- sizeof(vrt::if_packet_info_t().cid) //no class id ever used
- sizeof(vrt::if_packet_info_t().tsi) //no int time ever used
;
const size_t bpp = xport->get_recv_frame_size() - hdr_size;
const size_t bpi = convert::get_bytes_per_item(args.otw_format);
size_t spp = unsigned(args.args.cast("spp", bpp/bpi));
spp = std::min(N230_TX_MAX_SPP, spp); //FPGA FIFO maximum for framing at full rate
//make the new streamer given the samples per packet
if (not my_streamer) my_streamer = boost::make_shared(spp);
my_streamer->resize(args.channels.size());
//init some streamer stuff
my_streamer->set_vrt_unpacker(&n230_stream_manager::_cvita_hdr_unpack);
//set the converter
uhd::convert::id_type id;
id.input_format = args.otw_format + "_item32_be";
id.num_inputs = 1;
id.output_format = args.cpu_format;
id.num_outputs = 1;
my_streamer->set_converter(id);
perif.framer->clear();
perif.framer->set_nsamps_per_packet(spp);
perif.framer->set_sid(sid.reversed().get());
perif.framer->setup(args);
perif.ddc->setup(args);
//Give the streamer a functor to get the recv_buffer
//bind requires a zero_copy_if::sptr to add a streamer->xport lifetime dependency
my_streamer->set_xport_chan_get_buff(
stream_i,
boost::bind(&zero_copy_if::get_recv_buff, xport, _1),
true /*flush*/
);
my_streamer->set_overflow_handler(stream_i, boost::bind(
&n230_stream_manager::_handle_overflow, this, chan
));
my_streamer->set_issue_stream_cmd(stream_i, boost::bind(
&rx_vita_core_3000::issue_stream_command, perif.framer, _1
));
const size_t fc_window = _get_rx_flow_control_window(
xport->get_recv_frame_size(), buff_params_out.recv_buff_size);
const size_t fc_handle_window = std::max(1, fc_window / N230_RX_FC_REQUEST_FREQ);
perif.framer->configure_flow_control(fc_window);
//Give the streamer a functor to send flow control messages
//handle_rx_flowctrl is static and has no lifetime issues
boost::shared_ptr fc_cache(new rx_fc_cache_t());
my_streamer->set_xport_handle_flowctrl(
stream_i, boost::bind(&n230_stream_manager::_handle_rx_flowctrl, sid.get(), xport, fc_cache, _1),
fc_handle_window,
true/*init*/
);
//Store a weak pointer to prevent a streamer->manager->streamer circular dependency
_rx_streamers[chan] = my_streamer; //store weak pointer
_rx_stream_cached_args[chan] = args;
//Sets tick and samp rates on all streamer
update_tick_rate(_get_tick_rate());
//TODO: Find a way to remove this dependency
property_tree::sptr prop_tree = _tree.lock();
if (prop_tree) {
//TODO: Update this to support multiple motherboards
const fs_path mb_path = "/mboards/0";
prop_tree->access(mb_path / "rx_dsps" / boost::lexical_cast(chan) / "rate" / "value").update();
}
}
update_stream_states();
return my_streamer;
}
/***********************************************************************
* Transmit streamer
**********************************************************************/
tx_streamer::sptr n230_stream_manager::get_tx_stream(const uhd::stream_args_t &args_)
{
boost::mutex::scoped_lock lock(_stream_setup_mutex);
uhd::stream_args_t args = args_;
//setup defaults for unspecified values
if (not args.otw_format.empty() and args.otw_format != "sc16") {
throw uhd::value_error("n230_impl::get_tx_stream only supports otw_format sc16");
}
args.otw_format = "sc16";
args.channels = args.channels.empty()? std::vector(1, 0) : args.channels;
//shared async queue for all channels in streamer
boost::shared_ptr async_md(new async_md_queue_t(N230_TX_MAX_ASYNC_MESSAGES));
boost::shared_ptr my_streamer;
for (size_t stream_i = 0; stream_i < args.channels.size(); stream_i++)
{
const size_t chan = args.channels[stream_i];
radio_resource_t& perif = _resource_mgr->get_radio(chan);
//setup transport hints (default to a large recv buff)
//TODO: Propagate the device_args class into streamer in the future
device_addr_t device_addr = args.args;
if (not device_addr.has_key("send_buff_size")) {
device_addr["send_buff_size"] = boost::lexical_cast(_dev_args.get_send_buff_size());
}
if (not device_addr.has_key("send_frame_size")) {
device_addr["send_frame_size"] = boost::lexical_cast(_dev_args.get_send_frame_size());
}
if (not device_addr.has_key("num_send_frames")) {
device_addr["num_send_frames"] = boost::lexical_cast(_dev_args.get_num_send_frames());
}
transport::udp_zero_copy::buff_params buff_params_out;
sid_t sid;
zero_copy_if::sptr xport = _resource_mgr->create_transport(
TX_DATA, chan, device_addr, sid, buff_params_out);
//calculate packet size
static const size_t hdr_size = 0
+ vrt::num_vrl_words32*sizeof(uint32_t)
+ vrt::max_if_hdr_words32*sizeof(uint32_t)
//+ sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer
- sizeof(vrt::if_packet_info_t().cid) //no class id ever used
- sizeof(vrt::if_packet_info_t().tsi) //no int time ever used
;
const size_t bpp = xport->get_send_frame_size() - hdr_size;
const size_t bpi = convert::get_bytes_per_item(args.otw_format);
const size_t spp = unsigned(args.args.cast("spp", bpp/bpi));
//make the new streamer given the samples per packet
if (not my_streamer) my_streamer = boost::make_shared(spp);
my_streamer->resize(args.channels.size());
my_streamer->set_vrt_packer(&n230_stream_manager::_cvita_hdr_pack);
//set the converter
uhd::convert::id_type id;
id.input_format = args.cpu_format;
id.num_inputs = 1;
id.output_format = args.otw_format + "_item32_be";
id.num_outputs = 1;
my_streamer->set_converter(id);
perif.deframer->clear();
perif.deframer->setup(args);
perif.duc->setup(args);
//flow control setup
size_t fc_window = _get_tx_flow_control_window(
bpp, device_addr.cast("send_buff_size", _dev_args.get_send_buff_size()));
//In packets
const size_t fc_handle_window = (fc_window / N230_TX_FC_RESPONSE_FREQ);
perif.deframer->configure_flow_control(0/*cycs off*/, fc_handle_window);
boost::shared_ptr fc_cache(new tx_fc_cache_t());
fc_cache->stream_channel = stream_i;
fc_cache->device_channel = chan;
fc_cache->async_queue = async_md;
fc_cache->old_async_queue = _async_md_queue;
tick_rate_retriever_t get_tick_rate_fn = boost::bind(&n230_stream_manager::_get_tick_rate, this);
task::sptr task = task::make(
boost::bind(&n230_stream_manager::_handle_tx_async_msgs,
fc_cache, xport, get_tick_rate_fn));
//Give the streamer a functor to get the send buffer
//get_tx_buff_with_flowctrl is static so bind has no lifetime issues
//xport.send (sptr) is required to add streamer->data-transport lifetime dependency
//task (sptr) is required to add a streamer->async-handler lifetime dependency
my_streamer->set_xport_chan_get_buff(
stream_i,
boost::bind(&n230_stream_manager::_get_tx_buff_with_flowctrl, task, fc_cache, xport, fc_window, _1)
);
//Give the streamer a functor handled received async messages
my_streamer->set_async_receiver(
boost::bind(&async_md_queue_t::pop_with_timed_wait, async_md, _1, _2)
);
my_streamer->set_xport_chan_sid(stream_i, true, sid.get());
my_streamer->set_enable_trailer(false); //TODO not implemented trailer support yet
//Store a weak pointer to prevent a streamer->manager->streamer circular dependency
_tx_streamers[chan] = boost::weak_ptr(my_streamer);
_tx_stream_cached_args[chan] = args;
//Sets tick and samp rates on all streamer
update_tick_rate(_get_tick_rate());
//TODO: Find a way to remove this dependency
property_tree::sptr prop_tree = _tree.lock();
if (prop_tree) {
//TODO: Update this to support multiple motherboards
const fs_path mb_path = "/mboards/0";
prop_tree->access(mb_path / "tx_dsps" / boost::lexical_cast(chan) / "rate" / "value").update();
}
}
update_stream_states();
return my_streamer;
}
/***********************************************************************
* Async Message Receiver
**********************************************************************/
bool n230_stream_manager::recv_async_msg(async_metadata_t &async_metadata, double timeout)
{
return _async_md_queue->pop_with_timed_wait(async_metadata, timeout);
}
/***********************************************************************
* Sample Rate Updaters
**********************************************************************/
void n230_stream_manager::update_rx_samp_rate(const size_t dspno, const double rate)
{
boost::shared_ptr my_streamer =
boost::dynamic_pointer_cast(_rx_streamers[dspno].lock());
if (not my_streamer) return;
my_streamer->set_samp_rate(rate);
const double adj = _resource_mgr->get_radio(dspno).ddc->get_scaling_adjustment();
my_streamer->set_scale_factor(adj);
}
void n230_stream_manager::update_tx_samp_rate(const size_t dspno, const double rate)
{
boost::shared_ptr my_streamer =
boost::dynamic_pointer_cast(_tx_streamers[dspno].lock());
if (not my_streamer) return;
my_streamer->set_samp_rate(rate);
const double adj = _resource_mgr->get_radio(dspno).duc->get_scaling_adjustment();
my_streamer->set_scale_factor(adj);
}
/***********************************************************************
* Tick Rate Updater
**********************************************************************/
void n230_stream_manager::update_tick_rate(const double rate)
{
for (size_t i = 0; i < fpga::NUM_RADIOS; i++) {
radio_resource_t& perif = _resource_mgr->get_radio(i);
boost::shared_ptr my_rx_streamer =
boost::dynamic_pointer_cast(_rx_streamers[i].lock());
if (my_rx_streamer) my_rx_streamer->set_tick_rate(rate);
perif.framer->set_tick_rate(rate);
boost::shared_ptr my_tx_streamer =
boost::dynamic_pointer_cast(_tx_streamers[i].lock());
if (my_tx_streamer) my_tx_streamer->set_tick_rate(rate);
}
}
/***********************************************************************
* Stream State Updater
**********************************************************************/
void n230_stream_manager::update_stream_states()
{
//extract settings from state variables
const bool enb_tx0 = bool(_tx_streamers[0].lock());
const bool enb_rx0 = bool(_rx_streamers[0].lock());
const bool enb_tx1 = bool(_tx_streamers[1].lock());
const bool enb_rx1 = bool(_rx_streamers[1].lock());
fe_state_t fe0_state = NONE_STREAMING;
if (enb_tx0 && enb_rx0) fe0_state = TXRX_STREAMING;
else if (enb_tx0) fe0_state = TX_STREAMING;
else if (enb_rx0) fe0_state = RX_STREAMING;
fe_state_t fe1_state = NONE_STREAMING;
if (enb_tx1 && enb_rx1) fe1_state = TXRX_STREAMING;
else if (enb_tx1) fe1_state = TX_STREAMING;
else if (enb_rx1) fe1_state = RX_STREAMING;
_resource_mgr->get_frontend_ctrl().set_stream_state(fe0_state, fe1_state);
}
size_t n230_stream_manager::_get_rx_flow_control_window(size_t frame_size, size_t sw_buff_size)
{
double sw_buff_max = sw_buff_size * N230_RX_SW_BUFF_FULL_FACTOR;
size_t window_in_pkts = (static_cast(sw_buff_max) / frame_size);
if (window_in_pkts == 0) {
throw uhd::value_error("recv_buff_size must be larger than the recv_frame_size.");
}
return window_in_pkts;
}
void n230_stream_manager::_handle_overflow(const size_t i)
{
boost::shared_ptr my_streamer =
boost::dynamic_pointer_cast(_rx_streamers[i].lock());
if (my_streamer->get_num_channels() == 2) {
//MIMO
//find out if we were in continuous mode before stopping
const bool in_continuous_streaming_mode = _resource_mgr->get_radio(i).framer->in_continuous_streaming_mode();
//stop streaming
my_streamer->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
//restart streaming
if (in_continuous_streaming_mode) {
stream_cmd_t stream_cmd(stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
stream_cmd.stream_now = false;
stream_cmd.time_spec = _resource_mgr->get_radio(i).time->get_time_now() + time_spec_t(0.01);
my_streamer->issue_stream_cmd(stream_cmd);
}
} else {
_resource_mgr->get_radio(i).framer->handle_overflow();
}
}
void n230_stream_manager::_handle_rx_flowctrl(
const sid_t& sid,
zero_copy_if::sptr xport,
boost::shared_ptr fc_cache,
const size_t last_seq)
{
static const size_t RXFC_PACKET_LEN_IN_WORDS = 2;
static const size_t RXFC_CMD_CODE_OFFSET = 0;
static const size_t RXFC_SEQ_NUM_OFFSET = 1;
managed_send_buffer::sptr buff = xport->get_send_buff(0.0);
if (not buff) {
throw uhd::runtime_error("handle_rx_flowctrl timed out getting a send buffer");
}
uint32_t *pkt = buff->cast();
//recover seq32
size_t& seq_sw = fc_cache->last_seq_in;
const size_t seq_hw = seq_sw & HW_SEQ_NUM_MASK;
if (last_seq < seq_hw) seq_sw += (HW_SEQ_NUM_MASK + 1);
seq_sw &= ~HW_SEQ_NUM_MASK;
seq_sw |= last_seq;
//load packet info
vrt::if_packet_info_t packet_info;
packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_CONTEXT;
packet_info.num_payload_words32 = RXFC_PACKET_LEN_IN_WORDS;
packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t);
packet_info.packet_count = seq_sw;
packet_info.sob = false;
packet_info.eob = false;
packet_info.sid = sid.get();
packet_info.has_sid = true;
packet_info.has_cid = false;
packet_info.has_tsi = false;
packet_info.has_tsf = false;
packet_info.has_tlr = false;
//load header
_cvita_hdr_pack(pkt, packet_info);
//load payload
pkt[packet_info.num_header_words32 + RXFC_CMD_CODE_OFFSET] = uhd::htonx(N230_EVENT_CODE_FLOW_CTRL);
pkt[packet_info.num_header_words32 + RXFC_SEQ_NUM_OFFSET] = uhd::htonx(seq_sw);
//send the buffer over the interface
buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32));
}
void n230_stream_manager::_handle_tx_async_msgs(
boost::shared_ptr fc_cache,
zero_copy_if::sptr xport,
tick_rate_retriever_t get_tick_rate)
{
managed_recv_buffer::sptr buff = xport->get_recv_buff();
if (not buff) return;
//extract packet info
vrt::if_packet_info_t if_packet_info;
if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
const uint32_t *packet_buff = buff->cast();
//unpacking can fail
uint32_t (*endian_conv)(uint32_t) = uhd::ntohx;
try {
_cvita_hdr_unpack(packet_buff, if_packet_info);
endian_conv = uhd::ntohx;
} catch(const std::exception &ex) {
UHD_MSG(error) << "Error parsing async message packet: " << ex.what() << std::endl;
return;
}
//fill in the async metadata
async_metadata_t metadata;
load_metadata_from_buff(
endian_conv, metadata, if_packet_info, packet_buff,
get_tick_rate(), fc_cache->stream_channel);
//The FC response and the burst ack are two indicators that the radio
//consumed packets. Use them to update the FC metadata
if (metadata.event_code == N230_EVENT_CODE_FLOW_CTRL or
metadata.event_code == async_metadata_t::EVENT_CODE_BURST_ACK
) {
const size_t seq = metadata.user_payload[0];
fc_cache->seq_queue.push_with_pop_on_full(seq);
}
//FC responses don't propagate up to the user so filter them here
if (metadata.event_code != N230_EVENT_CODE_FLOW_CTRL) {
fc_cache->async_queue->push_with_pop_on_full(metadata);
metadata.channel = fc_cache->device_channel;
fc_cache->old_async_queue->push_with_pop_on_full(metadata);
standard_async_msg_prints(metadata);
}
}
managed_send_buffer::sptr n230_stream_manager::_get_tx_buff_with_flowctrl(
task::sptr /*holds ref*/,
boost::shared_ptr fc_cache,
zero_copy_if::sptr xport,
size_t fc_pkt_window,
const double timeout)
{
while (true)
{
const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - (fc_cache->last_seq_ack & HW_SEQ_NUM_MASK);
if ((delta & HW_SEQ_NUM_MASK) <= fc_pkt_window) break;
const bool ok = fc_cache->seq_queue.pop_with_timed_wait(fc_cache->last_seq_ack, timeout);
if (not ok) return managed_send_buffer::sptr(); //timeout waiting for flow control
}
managed_send_buffer::sptr buff = xport->get_send_buff(timeout);
if (buff) fc_cache->last_seq_out++; //update seq, this will actually be a send
return buff;
}
size_t n230_stream_manager::_get_tx_flow_control_window(
size_t payload_size,
size_t hw_buff_size)
{
size_t window_in_pkts = hw_buff_size / payload_size;
if (window_in_pkts == 0) {
throw uhd::value_error("send_buff_size must be larger than the send_frame_size.");
}
return window_in_pkts;
}
double n230_stream_manager::_get_tick_rate()
{
return _resource_mgr->get_clk_pps_ctrl().get_tick_rate();
}
void n230_stream_manager::_cvita_hdr_unpack(
const uint32_t *packet_buff,
vrt::if_packet_info_t &if_packet_info)
{
if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR;
return vrt::if_hdr_unpack_be(packet_buff, if_packet_info);
}
void n230_stream_manager::_cvita_hdr_pack(
uint32_t *packet_buff,
vrt::if_packet_info_t &if_packet_info)
{
if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR;
return vrt::if_hdr_pack_be(packet_buff, if_packet_info);
}
}}} //namespace