//
// Copyright 2012-2016 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//
#include "ctrl_iface.hpp"
#include "async_packet_handler.hpp"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace uhd;
using namespace uhd::rfnoc;
using namespace uhd::transport;
static const double ACK_TIMEOUT = 2.0; //supposed to be worst case practical timeout
static const double MASSIVE_TIMEOUT = 10.0; //for when we wait on a timed command
static const size_t SR_READBACK = 32;
ctrl_iface::~ctrl_iface(void){
/* NOP */
}
class ctrl_iface_impl: public ctrl_iface
{
public:
ctrl_iface_impl(const bool big_endian,
uhd::transport::zero_copy_if::sptr ctrl_xport,
uhd::transport::zero_copy_if::sptr resp_xport,
const uint32_t sid, const std::string &name
) :
_link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR),
_packet_type(vrt::if_packet_info_t::PACKET_TYPE_CONTEXT),
_bige(big_endian),
_ctrl_xport(ctrl_xport), _resp_xport(resp_xport),
_sid(sid),
_name(name),
_seq_out(0),
_timeout(ACK_TIMEOUT),
_resp_queue(128/*max response msgs*/),
_resp_queue_size(_resp_xport ? _resp_xport->get_num_recv_frames() : 3),
_rb_address(uhd::rfnoc::SR_READBACK)
{
if (resp_xport) {
while (resp_xport->get_recv_buff(0.0)) {} //flush
}
this->set_time(uhd::time_spec_t(0.0));
this->set_tick_rate(1.0); //something possible but bogus
}
~ctrl_iface_impl(void)
{
_timeout = ACK_TIMEOUT; //reset timeout to something small
UHD_SAFE_CALL(
this->peek32(0);//dummy peek with the purpose of ack'ing all packets
_async_task.reset();//now its ok to release the task
)
}
/*******************************************************************
* Peek and poke 32 bit implementation
******************************************************************/
void poke32(const wb_addr_type addr, const uint32_t data)
{
boost::mutex::scoped_lock lock(_mutex);
this->send_pkt(addr/4, data);
this->wait_for_ack(false);
}
uint32_t peek32(const wb_addr_type addr)
{
boost::mutex::scoped_lock lock(_mutex);
this->send_pkt(_rb_address, addr/8);
const uint64_t res = this->wait_for_ack(true);
const uint32_t lo = uint32_t(res & 0xffffffff);
const uint32_t hi = uint32_t(res >> 32);
return ((addr/4) & 0x1)? hi : lo;
}
uint64_t peek64(const wb_addr_type addr)
{
boost::mutex::scoped_lock lock(_mutex);
this->send_pkt(_rb_address, addr/8);
return this->wait_for_ack(true);
}
/*******************************************************************
* Update methods for time
******************************************************************/
void set_time(const uhd::time_spec_t &time)
{
boost::mutex::scoped_lock lock(_mutex);
_time = time;
_use_time = _time != uhd::time_spec_t(0.0);
if (_use_time) _timeout = MASSIVE_TIMEOUT; //permanently sets larger timeout
}
uhd::time_spec_t get_time(void)
{
boost::mutex::scoped_lock lock(_mutex);
return _time;
}
void set_tick_rate(const double rate)
{
boost::mutex::scoped_lock lock(_mutex);
_tick_rate = rate;
}
private:
// This is the buffer type for response messages
struct resp_buff_type
{
uint32_t data[8];
};
/*******************************************************************
* Primary control and interaction private methods
******************************************************************/
inline void send_pkt(const uint32_t addr, const uint32_t data = 0)
{
managed_send_buffer::sptr buff = _ctrl_xport->get_send_buff(0.0);
if (not buff) {
throw uhd::runtime_error("fifo ctrl timed out getting a send buffer");
}
uint32_t *pkt = buff->cast();
//load packet info
vrt::if_packet_info_t packet_info;
packet_info.link_type = _link_type;
packet_info.packet_type = _packet_type;
packet_info.num_payload_words32 = 2;
packet_info.num_payload_bytes = packet_info.num_payload_words32*sizeof(uint32_t);
packet_info.packet_count = _seq_out;
packet_info.tsf = _time.to_ticks(_tick_rate);
packet_info.sob = false;
packet_info.eob = false;
packet_info.sid = _sid;
packet_info.has_sid = true;
packet_info.has_cid = false;
packet_info.has_tsi = false;
packet_info.has_tsf = _use_time;
packet_info.has_tlr = false;
//load header
if (_bige) vrt::if_hdr_pack_be(pkt, packet_info);
else vrt::if_hdr_pack_le(pkt, packet_info);
//load payload
pkt[packet_info.num_header_words32+0] = (_bige)? uhd::htonx(addr) : uhd::htowx(addr);
pkt[packet_info.num_header_words32+1] = (_bige)? uhd::htonx(data) : uhd::htowx(data);
//UHD_MSG(status) << boost::format("0x%08x, 0x%08x\n") % addr % data;
//send the buffer over the interface
_outstanding_seqs.push(_seq_out);
buff->commit(sizeof(uint32_t)*(packet_info.num_packet_words32));
_seq_out++;//inc seq for next call
}
UHD_INLINE uint64_t wait_for_ack(const bool readback)
{
while (readback or (_outstanding_seqs.size() >= _resp_queue_size))
{
//get seq to ack from outstanding packets list
UHD_ASSERT_THROW(not _outstanding_seqs.empty());
const size_t seq_to_ack = _outstanding_seqs.front();
_outstanding_seqs.pop();
//parse the packet
vrt::if_packet_info_t packet_info;
resp_buff_type resp_buff;
memset(&resp_buff, 0x00, sizeof(resp_buff));
uint32_t const *pkt = NULL;
managed_recv_buffer::sptr buff;
//get buffer from response endpoint - or die in timeout
if (_resp_xport)
{
buff = _resp_xport->get_recv_buff(_timeout);
try
{
UHD_ASSERT_THROW(bool(buff));
UHD_ASSERT_THROW(buff->size() > 0);
}
catch(const std::exception &ex)
{
throw uhd::io_error(str(boost::format("Block ctrl (%s) no response packet - %s") % _name % ex.what()));
}
pkt = buff->cast();
packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
}
//get buffer from response endpoint - or die in timeout
else
{
/*
* Couldn't get message with haste.
* Now check both possible queues for messages.
* Messages should come in on _resp_queue,
* but could end up in dump_queue.
* If we don't get a message --> Die in timeout.
*/
double accum_timeout = 0.0;
const double short_timeout = 0.005; // == 5ms
while(not ((_resp_queue.pop_with_haste(resp_buff))
|| (check_dump_queue(resp_buff))
|| (_resp_queue.pop_with_timed_wait(resp_buff, short_timeout))
)){
/*
* If a message couldn't be received within a given timeout
* --> throw AssertionError!
*/
accum_timeout += short_timeout;
UHD_ASSERT_THROW(accum_timeout < _timeout);
}
pkt = resp_buff.data;
packet_info.num_packet_words32 = sizeof(resp_buff)/sizeof(uint32_t);
}
//parse the buffer
try
{
packet_info.link_type = _link_type;
if (_bige) vrt::chdr::if_hdr_unpack_be(pkt, packet_info);
else vrt::chdr::if_hdr_unpack_le(pkt, packet_info);
}
catch(const std::exception &ex)
{
UHD_MSG(error) << "[" << _name << "] Block ctrl bad VITA packet: " << ex.what() << std::endl;
if (buff){
UHD_MSG(status) << boost::format("%08X") % pkt[0] << std::endl;
UHD_MSG(status) << boost::format("%08X") % pkt[1] << std::endl;
UHD_MSG(status) << boost::format("%08X") % pkt[2] << std::endl;
UHD_MSG(status) << boost::format("%08X") % pkt[3] << std::endl;
}
else{
UHD_MSG(status) << "buff is NULL" << std::endl;
}
}
//check the buffer
try
{
UHD_ASSERT_THROW(packet_info.has_sid);
if (packet_info.sid != uint32_t((_sid >> 16) | (_sid << 16))) {
throw uhd::io_error(
str(
boost::format("Expected SID: %s Received SID: %s")
% uhd::sid_t(_sid).reversed().to_pp_string_hex()
% uhd::sid_t(packet_info.sid).to_pp_string_hex()
)
);
}
if (packet_info.packet_count != (seq_to_ack & 0xfff)) {
throw uhd::io_error(
str(
boost::format("Expected packet index: %d Received index: %d")
% packet_info.packet_count
% (seq_to_ack & 0xfff)
)
);
}
UHD_ASSERT_THROW(packet_info.num_payload_words32 == 2);
//UHD_ASSERT_THROW(packet_info.packet_type == _packet_type);
}
catch(const std::exception &ex)
{
throw uhd::io_error(str(boost::format("Block ctrl (%s) packet parse error - %s") % _name % ex.what()));
}
//return the readback value
if (readback and _outstanding_seqs.empty())
{
const uint64_t hi = (_bige)? uhd::ntohx(pkt[packet_info.num_header_words32+0]) : uhd::wtohx(pkt[packet_info.num_header_words32+0]);
const uint64_t lo = (_bige)? uhd::ntohx(pkt[packet_info.num_header_words32+1]) : uhd::wtohx(pkt[packet_info.num_header_words32+1]);
return ((hi << 32) | lo);
}
}
return 0;
}
/*
* If ctrl_core waits for a message that didn't arrive it can search for it in the dump queue.
* This actually happens during shutdown.
* handle_async_task can't access queue anymore thus it returns the corresponding message.
* msg_task class implements a dump_queue to store such messages.
* With check_dump_queue we can check if a message we are waiting for got stranded there.
* If a message got stuck we get it here and push it onto our own message_queue.
*/
bool check_dump_queue(resp_buff_type& b) {
const size_t min_buff_size = 8; // Same value as in b200_io_impl->handle_async_task
uint32_t recv_sid = (((_sid)<<16)|((_sid)>>16));
uhd::msg_task::msg_payload_t msg;
do{
msg = _async_task->get_msg_from_dump_queue(recv_sid);
}
while(msg.size() < min_buff_size && msg.size() != 0);
if(msg.size() >= min_buff_size) {
memcpy(b.data, &msg.front(), std::min(msg.size(), sizeof(b.data)));
return true;
}
return false;
}
void push_response(const uint32_t *buff)
{
resp_buff_type resp_buff;
std::memcpy(resp_buff.data, buff, sizeof(resp_buff));
_resp_queue.push_with_haste(resp_buff);
}
void hold_task(uhd::msg_task::sptr task)
{
_async_task = task;
}
const vrt::if_packet_info_t::link_type_t _link_type;
const vrt::if_packet_info_t::packet_type_t _packet_type;
const bool _bige;
const uhd::transport::zero_copy_if::sptr _ctrl_xport;
const uhd::transport::zero_copy_if::sptr _resp_xport;
uhd::msg_task::sptr _async_task;
const uint32_t _sid;
const std::string _name;
boost::mutex _mutex;
size_t _seq_out;
uhd::time_spec_t _time;
bool _use_time;
double _tick_rate;
double _timeout;
std::queue _outstanding_seqs;
bounded_buffer _resp_queue;
const size_t _resp_queue_size;
const size_t _rb_address;
};
ctrl_iface::sptr ctrl_iface::make(
const bool big_endian,
zero_copy_if::sptr ctrl_xport,
zero_copy_if::sptr resp_xport,
const uint32_t sid,
const std::string &name
) {
return sptr(new ctrl_iface_impl(
big_endian, ctrl_xport, resp_xport, sid, name
));
}