diff options
Diffstat (limited to 'host/lib')
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 117 | 
1 files changed, 76 insertions, 41 deletions
| diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 0c92c33b2..c96528694 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -34,60 +34,81 @@ namespace asio = boost::asio;  /***********************************************************************   * io impl details (internal to this file) + * - pirate crew + * - alignment buffer + * - thread loop + * - vrt packet handler states   **********************************************************************/  struct usrp2_impl::io_impl{      typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type; -    io_impl(std::vector<udp_zero_copy::sptr> &zc_ifs); -    ~io_impl(void); +    io_impl(size_t num_frames, size_t width): +        packet_handler_recv_state(width), +        recv_pirate_booty(alignment_buffer_type::make(num_frames, width)) +    { +        /* NOP */ +    } + +    ~io_impl(void){ +        recv_pirate_crew_raiding = false; +        recv_pirate_crew.interrupt_all(); +        recv_pirate_crew.join_all(); +    } -    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs); +    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(100)); +    }      //state management for the vrt packet handler code      vrt_packet_handler::recv_state packet_handler_recv_state;      vrt_packet_handler::send_state packet_handler_send_state; -    //methods and variables for the recv pirate -    void recv_pirate_loop(zero_copy_if::sptr zc_if, size_t index); +    //methods and variables for the pirate crew +    void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t);      boost::thread_group recv_pirate_crew; -    bool recv_pirate_crew_running; +    bool recv_pirate_crew_raiding;      alignment_buffer_type::sptr recv_pirate_booty;  }; -usrp2_impl::io_impl::io_impl(std::vector<udp_zero_copy::sptr> &zc_ifs): -    packet_handler_recv_state(zc_ifs.size()) -{ -    //create a large enough booty -    size_t num_frames = zc_ifs.at(0)->get_num_recv_frames(); -    std::cout << "Recv pirate num frames: " << num_frames << std::endl; -    recv_pirate_booty = alignment_buffer_type::make(num_frames, zc_ifs.size()); - -    //create a new pirate thread for each zc if (yarr!!) -    for (size_t i = 0; i < zc_ifs.size(); i++){ -        recv_pirate_crew.create_thread( -            boost::bind(&usrp2_impl::io_impl::recv_pirate_loop, this, zc_ifs.at(i), i) -        ); -    } -} - -usrp2_impl::io_impl::~io_impl(void){ -    recv_pirate_crew_running = false; -    recv_pirate_crew.interrupt_all(); -    recv_pirate_crew.join_all(); -} - -bool usrp2_impl::io_impl::get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs){ -    boost::this_thread::disable_interruption di; //disable because the wait can throw -    return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(100)); -} - -void usrp2_impl::io_impl::recv_pirate_loop(zero_copy_if::sptr zc_if, size_t index){ +/*********************************************************************** + * Receive Pirate Loop + * - while raiding, loot for recv buffers + * - put booty into the alignment buffer + **********************************************************************/ +void usrp2_impl::io_impl::recv_pirate_loop( +    zero_copy_if::sptr zc_if, +    usrp2_mboard_impl::sptr mboard, +    size_t index +){      set_thread_priority_safe(); -    recv_pirate_crew_running = true; -    while(recv_pirate_crew_running){ +    recv_pirate_crew_raiding = true; +    while(recv_pirate_crew_raiding){          managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); -        //TODO unpack vrt, get time spec, round to nearest packet bound, use below: -        if (buff->size()) recv_pirate_booty->push_with_pop_on_full(buff, time_spec_t(/*todoseq*/), index); +        if (buff->size() == 0) continue; //ignore timeout buffers + +        try{ +            //extract the vrt header packet info +            vrt::if_packet_info_t if_packet_info; +            if_packet_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t); +            vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), if_packet_info); + +            //extract the timespec and round to the nearest packet +            UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf); + +            //size_t pkt_dur_ticks = mboard->get_master_clock_freq() * 1; //TODO FIXME +            //size_t(if_packet_info.tsf) - size_t(if_packet_info.tsf)%pkt_dur_ticks +            //the idea is to round the time specs to packet boundaries to avoid the issue +            //of never getting alignment when things are not locked properly +            time_spec_t time( +                time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq() +            ); + +            //push the packet into the buffer with the new time +            recv_pirate_booty->push_with_pop_on_full(buff, time, index); +        }catch(const std::exception &e){ +            std::cerr << "Error (usrp2 recv pirate loop): " << e.what() << std::endl; +        }      }  } @@ -103,11 +124,25 @@ void usrp2_impl::io_init(void){          send_buff->commit(sizeof(data));      } -    std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl; -    std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl; +    //the number of recv frames is the number for the first transport +    //the assumption is that all data transports should be identical +    size_t num_frames = _data_transports.front()->get_num_recv_frames();      //create new io impl -    _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transports)); +    _io_impl = UHD_PIMPL_MAKE(io_impl, (num_frames, _data_transports.size())); + +    //create a new pirate thread for each zc if (yarr!!) +    for (size_t i = 0; i < _data_transports.size(); i++){ +        _io_impl->recv_pirate_crew.create_thread(boost::bind( +            &usrp2_impl::io_impl::recv_pirate_loop, +            _io_impl, _data_transports.at(i), +            _mboards.at(i), i +        )); +    } + +    std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl; +    std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl; +    std::cout << "Recv pirate num frames: " << num_frames << std::endl;  }  /*********************************************************************** | 
