aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp/e100/io_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/usrp/e100/io_impl.cpp')
-rw-r--r--host/lib/usrp/e100/io_impl.cpp218
1 files changed, 124 insertions, 94 deletions
diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp
index dc0d959fa..a8d3ad015 100644
--- a/host/lib/usrp/e100/io_impl.cpp
+++ b/host/lib/usrp/e100/io_impl.cpp
@@ -17,6 +17,7 @@
#include "../../transport/super_recv_packet_handler.hpp"
#include "../../transport/super_send_packet_handler.hpp"
+#include <linux/usrp_e.h> //ioctl structures and constants
#include "e100_impl.hpp"
#include "e100_regs.hpp"
#include <uhd/utils/msg.hpp>
@@ -29,32 +30,16 @@
#include <boost/format.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp>
+#include <poll.h> //poll
+#include <fcntl.h> //open, close
#include <sstream>
+#include <fstream>
using namespace uhd;
using namespace uhd::usrp;
using namespace uhd::transport;
/***********************************************************************
- * Helpers
- **********************************************************************/
-#if 1
-# define debug_print_buff(...)
-#else
-static void debug_print_buff(const std::string &what, managed_recv_buffer::sptr buff){
- std::ostringstream ss;
- ss << boost::format(
- "This is a %s packet, %u bytes.\n"
- ) % what % buff->size();
- for (size_t i = 0; i < 9; i++){
- ss << boost::format(" buff[%u] = 0x%08x\n") % i % buff->cast<const boost::uint32_t *>()[i];
- }
- ss << std::endl << std::endl;
- UHD_MSG(status) << ss.str();
-}
-#endif
-
-/***********************************************************************
* io impl details (internal to this file)
* - pirate crew of 1
* - bounded buffer
@@ -63,34 +48,52 @@ static void debug_print_buff(const std::string &what, managed_recv_buffer::sptr
**********************************************************************/
struct e100_impl::io_impl{
io_impl(zero_copy_if::sptr &xport):
- data_xport(xport),
+ data_transport(xport),
async_msg_fifo(100/*messages deep*/)
{
for (size_t i = 0; i < E100_NUM_RX_DSPS; i++){
- typedef bounded_buffer<managed_recv_buffer::sptr> booty_type;
- recv_pirate_booty.push_back(new booty_type(data_xport->get_num_recv_frames()));
+ typedef bounded_buffer<managed_recv_buffer::sptr> buffs_queue_type;
+ _buffs_queue.push_back(new buffs_queue_type(data_transport->get_num_recv_frames()));
}
}
~io_impl(void){
recv_pirate_crew.interrupt_all();
recv_pirate_crew.join_all();
- for (size_t i = 0; i < recv_pirate_booty.size(); i++){
- delete recv_pirate_booty[i];
+ for (size_t i = 0; i < _buffs_queue.size(); i++){
+ delete _buffs_queue[i];
}
}
- managed_recv_buffer::sptr get_recv_buff(const size_t index, double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- managed_recv_buffer::sptr buff;
- recv_pirate_booty[index]->pop_with_timed_wait(buff, timeout);
- return buff; //ASSUME buff == NULL when pop times-out
+ std::vector<bounded_buffer<managed_recv_buffer::sptr> *> _buffs_queue;
+
+ //gets buffer, determines if its the requested index,
+ //and either queues the buffer or returns the buffer
+ managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout){
+ while (true){
+ managed_recv_buffer::sptr buff;
+
+ //attempt to pop a buffer from the queue
+ if (_buffs_queue[index]->pop_with_haste(buff)) return buff;
+
+ //otherwise, call into the transport
+ buff = data_transport->get_recv_buff(timeout);
+ if (buff.get() == NULL) return buff; //timeout
+
+ //check the stream id to know which channel
+ const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();
+ const size_t rx_index = uhd::wtohx(vrt_hdr[1]) - E100_DSP_SID_BASE;
+ if (rx_index == index) return buff; //got expected message
+
+ //otherwise queue and try again
+ if (rx_index < E100_NUM_RX_DSPS) _buffs_queue[rx_index]->push_with_pop_on_full(buff);
+ else UHD_MSG(error) << "Got a data packet with known SID " << uhd::wtohx(vrt_hdr[1]) << std::endl;
+ }
}
//The data transport is listed first so that it is deconstructed last,
//which is after the states and booty which may hold managed buffers.
- //This comment is invalid because its now a reference and not stored here.
- zero_copy_if::sptr &data_xport;
+ zero_copy_if::sptr data_transport;
//state management for the vrt packet handler code
sph::recv_packet_handler recv_handler;
@@ -98,72 +101,35 @@ struct e100_impl::io_impl{
bool continuous_streaming;
//a pirate's life is the life for me!
- void recv_pirate_loop(boost::barrier &, e100_clock_ctrl::sptr);
- std::vector<bounded_buffer<managed_recv_buffer::sptr> *> recv_pirate_booty;
- bounded_buffer<async_metadata_t> async_msg_fifo;
- boost::thread_group recv_pirate_crew;
-};
+ void recv_pirate_loop(
+ boost::barrier &spawn_barrier,
+ const boost::function<void(void)> &handle,
+ e100_iface::sptr //keep a sptr to iface which shares gpio145
+ ){
+ spawn_barrier.wait();
+ UHD_LOG << "pirate loop spawned" << std::endl;
-/***********************************************************************
- * Receive Pirate Loop
- * - while raiding, loot for recv buffers
- * - put booty into the alignment buffer
- **********************************************************************/
-void e100_impl::io_impl::recv_pirate_loop(
- boost::barrier &spawn_barrier, e100_clock_ctrl::sptr clock_ctrl
-){
- spawn_barrier.wait();
- set_thread_priority_safe();
-
- while (not boost::this_thread::interruption_requested()){
- managed_recv_buffer::sptr buff = this->data_xport->get_recv_buff();
- if (buff.get() == NULL) continue; //ignore timeout buffers
-
- //handle an rx data packet or inline message
- const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();
- const size_t rx_index = uhd::wtohx(vrt_hdr[1]) - E100_DSP_SID_BASE;
- if (rx_index < E100_NUM_RX_DSPS){
- debug_print_buff("data", buff);
- recv_pirate_booty[rx_index]->push_with_wait(buff);
- continue;
- }
+ //open the GPIO and set it up for an IRQ
+ std::ofstream edge_file("/sys/class/gpio/gpio145/edge");
+ edge_file << "rising" << std::endl << std::flush;
+ edge_file.close();
+ int fd = ::open("/sys/class/gpio/gpio145/value", O_RDONLY);
- //otherwise, unpack the vrt header and process below...
- vrt::if_packet_info_t if_packet_info;
- if_packet_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
- try{vrt::if_hdr_unpack_le(vrt_hdr, if_packet_info);}
- catch(const std::exception &e){
- UHD_MSG(error) << "Error unpacking vrt header:\n" << e.what() << std::endl;
- continue;
+ while (not boost::this_thread::interruption_requested()){
+ pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = POLLPRI | POLLERR;
+ ssize_t ret = ::poll(&pfd, 1, 100/*ms*/);
+ if (ret > 0) handle();
}
- //handle a tx async report message
- if (if_packet_info.sid == E100_ASYNC_SID and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
- debug_print_buff("async", buff);
-
- //fill in the async metadata
- async_metadata_t metadata;
- metadata.channel = 0;
- metadata.has_time_spec = if_packet_info.has_tsi and if_packet_info.has_tsf;
- metadata.time_spec = time_spec_t(
- time_t(if_packet_info.tsi), long(if_packet_info.tsf), clock_ctrl->get_fpga_clock_rate()
- );
- metadata.event_code = async_metadata_t::event_code_t(sph::get_context_code(vrt_hdr, if_packet_info));
- async_msg_fifo.push_with_pop_on_full(metadata);
- if (metadata.event_code &
- ( async_metadata_t::EVENT_CODE_UNDERFLOW
- | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET)
- ) UHD_MSG(fastpath) << "U";
- else if (metadata.event_code &
- ( async_metadata_t::EVENT_CODE_SEQ_ERROR
- | async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST)
- ) UHD_MSG(fastpath) << "S";
- continue;
- }
+ ::close(fd);
- debug_print_buff("unknown", buff);
+ UHD_LOG << "pirate loop done" << std::endl;
}
-}
+ bounded_buffer<async_metadata_t> async_msg_fifo;
+ boost::thread_group recv_pirate_crew;
+};
/***********************************************************************
* Helper Functions
@@ -171,23 +137,87 @@ void e100_impl::io_impl::recv_pirate_loop(
void e100_impl::io_init(void){
//setup before the registers (transport called to calculate max spp)
- _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_xport));
+ _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport));
//clear state machines
_iface->poke32(E100_REG_CLEAR_RX, 0);
_iface->poke32(E100_REG_CLEAR_TX, 0);
+ //prepare the async msg buffer for incoming messages
+ _iface->poke32(E100_REG_SR_ERR_CTRL, 1 << 0); //clear
+ while ((_iface->peek32(E100_REG_RB_ERR_STATUS) & (1 << 2)) == 0){} //wait for idle
+ _iface->poke32(E100_REG_SR_ERR_CTRL, 1 << 1); //start
+
//spawn a pirate, yarrr!
boost::barrier spawn_barrier(2);
+ boost::function<void(void)> handle_irq_cb = boost::bind(&e100_impl::handle_irq, this);
_io_impl->recv_pirate_crew.create_thread(boost::bind(
&e100_impl::io_impl::recv_pirate_loop, _io_impl.get(),
- boost::ref(spawn_barrier), _clock_ctrl
+ boost::ref(spawn_barrier), handle_irq_cb, _iface
));
spawn_barrier.wait();
//update mapping here since it didnt b4 when io init not called first
update_xport_channel_mapping();
}
+void e100_impl::handle_irq(void){
+ //check the status of the async msg buffer
+ const boost::uint32_t status = _iface->peek32(E100_REG_RB_ERR_STATUS);
+ if ((status & 0x3) == 0) return; //not done or error
+ //std::cout << boost::format("status: 0x%x") % status << std::endl;
+
+ //load the data struct and call the ioctl
+ usrp_e_ctl32 data;
+ data.offset = E100_REG_ERR_BUFF;
+ data.count = status >> 16;
+ //FIXME ioctl reads words32 incorrectly _iface->ioctl(USRP_E_READ_CTL32, &data);
+ for (size_t i = 0; i < data.count; i++){
+ data.buf[i] = _iface->peek32(E100_REG_ERR_BUFF + i*sizeof(boost::uint32_t));
+ //std::cout << boost::format(" buff[%u] = 0x%08x\n") % i % data.buf[i];
+ }
+
+ //unpack the vrt header and process below...
+ vrt::if_packet_info_t if_packet_info;
+ if_packet_info.num_packet_words32 = data.count;
+ try{vrt::if_hdr_unpack_le(data.buf, if_packet_info);}
+ catch(const std::exception &e){
+ UHD_MSG(error) << "Error unpacking vrt header:\n" << e.what() << std::endl;
+ goto prepare;
+ }
+
+ //handle a tx async report message
+ if (if_packet_info.sid == E100_ASYNC_SID and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
+
+ //fill in the async metadata
+ async_metadata_t metadata;
+ metadata.channel = 0;
+ metadata.has_time_spec = if_packet_info.has_tsi and if_packet_info.has_tsf;
+ metadata.time_spec = time_spec_t(
+ time_t(if_packet_info.tsi), long(if_packet_info.tsf), _clock_ctrl->get_fpga_clock_rate()
+ );
+ metadata.event_code = async_metadata_t::event_code_t(sph::get_context_code(data.buf, if_packet_info));
+
+ //push the message onto the queue
+ _io_impl->async_msg_fifo.push_with_pop_on_full(metadata);
+
+ //print some fastpath messages
+ if (metadata.event_code &
+ ( async_metadata_t::EVENT_CODE_UNDERFLOW
+ | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET)
+ ) UHD_MSG(fastpath) << "U";
+ else if (metadata.event_code &
+ ( async_metadata_t::EVENT_CODE_SEQ_ERROR
+ | async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST)
+ ) UHD_MSG(fastpath) << "S";
+ }
+
+ //prepare for the next round
+ prepare:
+ _iface->poke32(E100_REG_SR_ERR_CTRL, 1 << 0); //clear
+ while ((_iface->peek32(E100_REG_RB_ERR_STATUS) & (1 << 2)) == 0){} //wait for idle
+ _iface->poke32(E100_REG_SR_ERR_CTRL, 1 << 1); //start
+}
+
void e100_impl::update_xport_channel_mapping(void){
if (_io_impl.get() == NULL) return; //not inited yet
@@ -217,7 +247,7 @@ void e100_impl::update_xport_channel_mapping(void){
_io_impl->send_handler.set_samp_rate(_tx_dsp_proxies[_tx_dsp_proxies.keys().at(0)]->get_link()[DSP_PROP_HOST_RATE].as<double>());
for (size_t chan = 0; chan < _io_impl->send_handler.size(); chan++){
_io_impl->send_handler.set_xport_chan_get_buff(chan, boost::bind(
- &uhd::transport::zero_copy_if::get_send_buff, _io_impl->data_xport, _1
+ &uhd::transport::zero_copy_if::get_send_buff, _io_impl->data_transport, _1
));
}
_io_impl->send_handler.set_converter(_send_otw_type);