aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/usrp/usrp2/io_impl.cpp117
1 files changed, 76 insertions, 41 deletions
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index 0c92c33b2..c96528694 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -34,60 +34,81 @@ namespace asio = boost::asio;
/***********************************************************************
* io impl details (internal to this file)
+ * - pirate crew
+ * - alignment buffer
+ * - thread loop
+ * - vrt packet handler states
**********************************************************************/
struct usrp2_impl::io_impl{
typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type;
- io_impl(std::vector<udp_zero_copy::sptr> &zc_ifs);
- ~io_impl(void);
+ io_impl(size_t num_frames, size_t width):
+ packet_handler_recv_state(width),
+ recv_pirate_booty(alignment_buffer_type::make(num_frames, width))
+ {
+ /* NOP */
+ }
+
+ ~io_impl(void){
+ recv_pirate_crew_raiding = false;
+ recv_pirate_crew.interrupt_all();
+ recv_pirate_crew.join_all();
+ }
- bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs);
+ bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(100));
+ }
//state management for the vrt packet handler code
vrt_packet_handler::recv_state packet_handler_recv_state;
vrt_packet_handler::send_state packet_handler_send_state;
- //methods and variables for the recv pirate
- void recv_pirate_loop(zero_copy_if::sptr zc_if, size_t index);
+ //methods and variables for the pirate crew
+ void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t);
boost::thread_group recv_pirate_crew;
- bool recv_pirate_crew_running;
+ bool recv_pirate_crew_raiding;
alignment_buffer_type::sptr recv_pirate_booty;
};
-usrp2_impl::io_impl::io_impl(std::vector<udp_zero_copy::sptr> &zc_ifs):
- packet_handler_recv_state(zc_ifs.size())
-{
- //create a large enough booty
- size_t num_frames = zc_ifs.at(0)->get_num_recv_frames();
- std::cout << "Recv pirate num frames: " << num_frames << std::endl;
- recv_pirate_booty = alignment_buffer_type::make(num_frames, zc_ifs.size());
-
- //create a new pirate thread for each zc if (yarr!!)
- for (size_t i = 0; i < zc_ifs.size(); i++){
- recv_pirate_crew.create_thread(
- boost::bind(&usrp2_impl::io_impl::recv_pirate_loop, this, zc_ifs.at(i), i)
- );
- }
-}
-
-usrp2_impl::io_impl::~io_impl(void){
- recv_pirate_crew_running = false;
- recv_pirate_crew.interrupt_all();
- recv_pirate_crew.join_all();
-}
-
-bool usrp2_impl::io_impl::get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(100));
-}
-
-void usrp2_impl::io_impl::recv_pirate_loop(zero_copy_if::sptr zc_if, size_t index){
+/***********************************************************************
+ * Receive Pirate Loop
+ * - while raiding, loot for recv buffers
+ * - put booty into the alignment buffer
+ **********************************************************************/
+void usrp2_impl::io_impl::recv_pirate_loop(
+ zero_copy_if::sptr zc_if,
+ usrp2_mboard_impl::sptr mboard,
+ size_t index
+){
set_thread_priority_safe();
- recv_pirate_crew_running = true;
- while(recv_pirate_crew_running){
+ recv_pirate_crew_raiding = true;
+ while(recv_pirate_crew_raiding){
managed_recv_buffer::sptr buff = zc_if->get_recv_buff();
- //TODO unpack vrt, get time spec, round to nearest packet bound, use below:
- if (buff->size()) recv_pirate_booty->push_with_pop_on_full(buff, time_spec_t(/*todoseq*/), index);
+ if (buff->size() == 0) continue; //ignore timeout buffers
+
+ try{
+ //extract the vrt header packet info
+ vrt::if_packet_info_t if_packet_info;
+ if_packet_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
+ vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), if_packet_info);
+
+ //extract the timespec and round to the nearest packet
+ UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf);
+
+ //size_t pkt_dur_ticks = mboard->get_master_clock_freq() * 1; //TODO FIXME
+ //size_t(if_packet_info.tsf) - size_t(if_packet_info.tsf)%pkt_dur_ticks
+ //the idea is to round the time specs to packet boundaries to avoid the issue
+ //of never getting alignment when things are not locked properly
+ time_spec_t time(
+ time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq()
+ );
+
+ //push the packet into the buffer with the new time
+ recv_pirate_booty->push_with_pop_on_full(buff, time, index);
+ }catch(const std::exception &e){
+ std::cerr << "Error (usrp2 recv pirate loop): " << e.what() << std::endl;
+ }
}
}
@@ -103,11 +124,25 @@ void usrp2_impl::io_init(void){
send_buff->commit(sizeof(data));
}
- std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl;
- std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl;
+ //the number of recv frames is the number for the first transport
+ //the assumption is that all data transports should be identical
+ size_t num_frames = _data_transports.front()->get_num_recv_frames();
//create new io impl
- _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transports));
+ _io_impl = UHD_PIMPL_MAKE(io_impl, (num_frames, _data_transports.size()));
+
+ //create a new pirate thread for each zc if (yarr!!)
+ for (size_t i = 0; i < _data_transports.size(); i++){
+ _io_impl->recv_pirate_crew.create_thread(boost::bind(
+ &usrp2_impl::io_impl::recv_pirate_loop,
+ _io_impl, _data_transports.at(i),
+ _mboards.at(i), i
+ ));
+ }
+
+ std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl;
+ std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl;
+ std::cout << "Recv pirate num frames: " << num_frames << std::endl;
}
/***********************************************************************