diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 117 |
1 files changed, 76 insertions, 41 deletions
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 0c92c33b2..c96528694 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -34,60 +34,81 @@ namespace asio = boost::asio; /*********************************************************************** * io impl details (internal to this file) + * - pirate crew + * - alignment buffer + * - thread loop + * - vrt packet handler states **********************************************************************/ struct usrp2_impl::io_impl{ typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type; - io_impl(std::vector<udp_zero_copy::sptr> &zc_ifs); - ~io_impl(void); + io_impl(size_t num_frames, size_t width): + packet_handler_recv_state(width), + recv_pirate_booty(alignment_buffer_type::make(num_frames, width)) + { + /* NOP */ + } + + ~io_impl(void){ + recv_pirate_crew_raiding = false; + recv_pirate_crew.interrupt_all(); + recv_pirate_crew.join_all(); + } - bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs); + bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(100)); + } //state management for the vrt packet handler code vrt_packet_handler::recv_state packet_handler_recv_state; vrt_packet_handler::send_state packet_handler_send_state; - //methods and variables for the recv pirate - void recv_pirate_loop(zero_copy_if::sptr zc_if, size_t index); + //methods and variables for the pirate crew + void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t); boost::thread_group recv_pirate_crew; - bool recv_pirate_crew_running; + bool recv_pirate_crew_raiding; alignment_buffer_type::sptr recv_pirate_booty; }; -usrp2_impl::io_impl::io_impl(std::vector<udp_zero_copy::sptr> &zc_ifs): - packet_handler_recv_state(zc_ifs.size()) -{ - //create a large enough booty - size_t num_frames = zc_ifs.at(0)->get_num_recv_frames(); - std::cout << "Recv pirate num frames: " << num_frames << std::endl; - recv_pirate_booty = alignment_buffer_type::make(num_frames, zc_ifs.size()); - - //create a new pirate thread for each zc if (yarr!!) - for (size_t i = 0; i < zc_ifs.size(); i++){ - recv_pirate_crew.create_thread( - boost::bind(&usrp2_impl::io_impl::recv_pirate_loop, this, zc_ifs.at(i), i) - ); - } -} - -usrp2_impl::io_impl::~io_impl(void){ - recv_pirate_crew_running = false; - recv_pirate_crew.interrupt_all(); - recv_pirate_crew.join_all(); -} - -bool usrp2_impl::io_impl::get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(100)); -} - -void usrp2_impl::io_impl::recv_pirate_loop(zero_copy_if::sptr zc_if, size_t index){ +/*********************************************************************** + * Receive Pirate Loop + * - while raiding, loot for recv buffers + * - put booty into the alignment buffer + **********************************************************************/ +void usrp2_impl::io_impl::recv_pirate_loop( + zero_copy_if::sptr zc_if, + usrp2_mboard_impl::sptr mboard, + size_t index +){ set_thread_priority_safe(); - recv_pirate_crew_running = true; - while(recv_pirate_crew_running){ + recv_pirate_crew_raiding = true; + while(recv_pirate_crew_raiding){ managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); - //TODO unpack vrt, get time spec, round to nearest packet bound, use below: - if (buff->size()) recv_pirate_booty->push_with_pop_on_full(buff, time_spec_t(/*todoseq*/), index); + if (buff->size() == 0) continue; //ignore timeout buffers + + try{ + //extract the vrt header packet info + vrt::if_packet_info_t if_packet_info; + if_packet_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t); + vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), if_packet_info); + + //extract the timespec and round to the nearest packet + UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf); + + //size_t pkt_dur_ticks = mboard->get_master_clock_freq() * 1; //TODO FIXME + //size_t(if_packet_info.tsf) - size_t(if_packet_info.tsf)%pkt_dur_ticks + //the idea is to round the time specs to packet boundaries to avoid the issue + //of never getting alignment when things are not locked properly + time_spec_t time( + time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq() + ); + + //push the packet into the buffer with the new time + recv_pirate_booty->push_with_pop_on_full(buff, time, index); + }catch(const std::exception &e){ + std::cerr << "Error (usrp2 recv pirate loop): " << e.what() << std::endl; + } } } @@ -103,11 +124,25 @@ void usrp2_impl::io_init(void){ send_buff->commit(sizeof(data)); } - std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl; - std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl; + //the number of recv frames is the number for the first transport + //the assumption is that all data transports should be identical + size_t num_frames = _data_transports.front()->get_num_recv_frames(); //create new io impl - _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transports)); + _io_impl = UHD_PIMPL_MAKE(io_impl, (num_frames, _data_transports.size())); + + //create a new pirate thread for each zc if (yarr!!) + for (size_t i = 0; i < _data_transports.size(); i++){ + _io_impl->recv_pirate_crew.create_thread(boost::bind( + &usrp2_impl::io_impl::recv_pirate_loop, + _io_impl, _data_transports.at(i), + _mboards.at(i), i + )); + } + + std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl; + std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl; + std::cout << "Recv pirate num frames: " << num_frames << std::endl; } /*********************************************************************** |