diff options
Diffstat (limited to 'host/lib/usrp/usrp2/io_impl.cpp')
-rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 187 |
1 files changed, 145 insertions, 42 deletions
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index cbc0a0817..c8e4b7096 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -21,11 +21,13 @@ #include <uhd/utils/byteswap.hpp> #include <uhd/utils/thread_priority.hpp> #include <uhd/transport/convert_types.hpp> -#include <uhd/transport/alignment_buffer.hpp> +#include <uhd/transport/bounded_buffer.hpp> #include <boost/format.hpp> #include <boost/bind.hpp> #include <boost/thread.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp> #include <iostream> +#include <list> using namespace uhd; using namespace uhd::usrp; @@ -108,16 +110,24 @@ private: * - vrt packet handler states **********************************************************************/ struct usrp2_impl::io_impl{ - typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type; - io_impl(size_t num_recv_frames, size_t send_frame_size, size_t width): + io_impl(size_t send_frame_size, size_t width): packet_handler_recv_state(width), - recv_pirate_booty(alignment_buffer_type::make(num_recv_frames-3, width)), async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/)) { - for (size_t i = 0; i < width; i++) fc_mons.push_back( - flow_control_monitor::sptr(new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size)) - ); + for (size_t i = 0; i < width; i++){ + fc_mons.push_back(flow_control_monitor::sptr( + new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size) + )); + //init empty packet infos + vrt::if_packet_info_t packet_info; + packet_info.packet_count = 0; + packet_info.has_tsi = true; + packet_info.tsi = 0; + packet_info.has_tsf = true; + packet_info.tsf = 0; + prev_infos.push_back(packet_info); + } } ~io_impl(void){ @@ -126,11 +136,6 @@ struct usrp2_impl::io_impl{ recv_pirate_crew.join_all(); } - bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout); - } - bool get_send_buffs( const std::vector<zero_copy_if::sptr> &trans, vrt_packet_handler::managed_send_buffs_t &buffs, @@ -151,6 +156,15 @@ struct usrp2_impl::io_impl{ return true; } + bool get_recv_buffs( + const std::vector<zero_copy_if::sptr> xports, + vrt_packet_handler::managed_recv_buffs_t &buffs, + double timeout + ); + + //previous state for each buffer + std::vector<vrt::if_packet_info_t> prev_infos; + //flow control monitors std::vector<flow_control_monitor::sptr> fc_mons; @@ -162,29 +176,28 @@ struct usrp2_impl::io_impl{ void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t); boost::thread_group recv_pirate_crew; bool recv_pirate_crew_raiding; - alignment_buffer_type::sptr recv_pirate_booty; bounded_buffer<async_metadata_t>::sptr async_msg_fifo; boost::mutex spawn_mutex; }; /*********************************************************************** * Receive Pirate Loop - * - while raiding, loot for recv buffers - * - put booty into the alignment buffer + * - while raiding, loot for message packet + * - update flow control condition count + * - put async message packets into queue **********************************************************************/ void usrp2_impl::io_impl::recv_pirate_loop( - zero_copy_if::sptr zc_if, + zero_copy_if::sptr zc_if_err0, usrp2_mboard_impl::sptr mboard, size_t index ){ set_thread_priority_safe(); recv_pirate_crew_raiding = true; - size_t next_packet_seq = 0; spawn_mutex.unlock(); while(recv_pirate_crew_raiding){ - managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); + managed_recv_buffer::sptr buff = zc_if_err0->get_recv_buff(); if (not buff.get()) continue; //ignore timeout/error buffers try{ @@ -194,26 +207,6 @@ void usrp2_impl::io_impl::recv_pirate_loop( const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>(); vrt::if_hdr_unpack_be(vrt_hdr, if_packet_info); - //handle the rx data stream - if (if_packet_info.sid == usrp2_impl::RECV_SID){ - //handle the packet count / sequence number - if (if_packet_info.packet_count != next_packet_seq){ - //std::cerr << "S" << (if_packet_info.packet_count - next_packet_seq)%16; - std::cerr << "O" << std::flush; //report overflow (drops in the kernel) - } - next_packet_seq = (if_packet_info.packet_count+1)%16; - - //extract the timespec and round to the nearest packet - UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf); - time_spec_t time( - time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq() - ); - - //push the packet into the buffer with the new time - recv_pirate_booty->push_with_pop_on_full(buff, time, index); - continue; - } - //handle a tx async report message if (if_packet_info.sid == usrp2_impl::ASYNC_SID and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ @@ -253,11 +246,10 @@ void usrp2_impl::io_impl::recv_pirate_loop( void usrp2_impl::io_init(void){ //the assumption is that all data transports should be identical - const size_t num_recv_frames = _data_transports.front()->get_num_recv_frames(); const size_t send_frame_size = _data_transports.front()->get_send_frame_size(); //create new io impl - _io_impl = UHD_PIMPL_MAKE(io_impl, (num_recv_frames, send_frame_size, _data_transports.size())); + _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports.size())); //TODO temporary fix for weird power up state, remove when FPGA fixed { @@ -276,7 +268,7 @@ void usrp2_impl::io_init(void){ //spawn a new pirate to plunder the recv booty _io_impl->recv_pirate_crew.create_thread(boost::bind( &usrp2_impl::io_impl::recv_pirate_loop, - _io_impl.get(), _data_transports.at(i), + _io_impl.get(), _err0_transports.at(i), _mboards.at(i), i )); //block here until the spawned thread unlocks @@ -328,6 +320,117 @@ size_t usrp2_impl::send( } /*********************************************************************** + * Alignment logic on receive + **********************************************************************/ +static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ + return boost::posix_time::microseconds(long(timeout*1e6)); +} + +static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ + return 1e-6*time_dur.total_microseconds(); +} + +static UHD_INLINE time_spec_t extract_time_spec(const vrt::if_packet_info_t &packet_info){ + return time_spec_t( //assumes has_tsi and has_tsf are true + time_t(packet_info.tsi), size_t(packet_info.tsf), + 100e6 //tick rate does not have to be correct for comparison purposes + ); +} + +static UHD_INLINE void extract_packet_info( + managed_recv_buffer::sptr buff, + vrt::if_packet_info_t &prev_info, + time_spec_t &time, bool &clear +){ + //extract packet info + vrt::if_packet_info_t next_info; + vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), next_info); + + //handle the packet count / sequence number + if ((prev_info.packet_count+1)%16 != next_info.packet_count){ + std::cerr << "O" << std::flush; //report overflow (drops in the kernel) + } + + time = extract_time_spec(next_info); + clear = extract_time_spec(prev_info) > time; + prev_info = next_info; +} + +UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs( + const std::vector<zero_copy_if::sptr> xports, + vrt_packet_handler::managed_recv_buffs_t &buffs, + double timeout +){ + if (buffs.size() == 1){ + buffs[0] = xports[0]->get_recv_buff(timeout); + if (buffs[0].get() == NULL) return false; + bool clear; time_spec_t time; //unused variables + //call extract_packet_info to handle printing the overflows + extract_packet_info(buffs[0], this->prev_infos[0], time, clear); + return true; + } + //-------------------- begin alignment logic ---------------------// + boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout); + managed_recv_buffer::sptr buff_tmp; + std::list<size_t> _all_indexes, indexes_to_do; + for (size_t i = 0; i < buffs.size(); i++) _all_indexes.push_back(i); + bool clear; + time_spec_t expected_time; + + //respond to a clear by starting from scratch + got_clear: + indexes_to_do = _all_indexes; + clear = false; + + //do an initial pop to load an initial sequence id + size_t index = indexes_to_do.front(); + buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); + if (buff_tmp.get() == NULL) return false; + extract_packet_info(buff_tmp, this->prev_infos[index], expected_time, clear); + if (clear) goto got_clear; + buffs[index] = buff_tmp; + indexes_to_do.pop_front(); + + //get an aligned set of elements from the buffers: + while(indexes_to_do.size() != 0){ + + //pop an element off for this index + index = indexes_to_do.front(); + buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); + if (buff_tmp.get() == NULL) return false; + time_spec_t this_time; + extract_packet_info(buff_tmp, this->prev_infos[index], this_time, clear); + if (clear) goto got_clear; + buffs[index] = buff_tmp; + + //if the sequence id matches: + // remove this index from the list and continue + if (this_time == expected_time){ + indexes_to_do.pop_front(); + continue; + } + + //if the sequence id is older: + // continue with the same index to try again + else if (this_time < expected_time){ + continue; + } + + //if the sequence id is newer: + // use the new expected time for comparison + // add all other indexes back into the list + else{ + expected_time = this_time; + indexes_to_do = _all_indexes; + indexes_to_do.remove(index); + continue; + } + } + return true; + //-------------------- end alignment logic -----------------------// +} + +/*********************************************************************** * Receive Data **********************************************************************/ size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{ @@ -357,7 +460,7 @@ size_t usrp2_impl::recv( io_type, _rx_otw_type, //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_unpack_be, - boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout), + boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _data_transports, _1, timeout), boost::bind(&handle_overflow, _mboards, _1) ); } |