diff options
Diffstat (limited to 'host/lib/usrp/usrp2/io_impl.cpp')
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 237 | 
1 files changed, 237 insertions, 0 deletions
| diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp new file mode 100644 index 000000000..9e29edd82 --- /dev/null +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -0,0 +1,237 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include "../../transport/vrt_packet_handler.hpp" +#include "usrp2_impl.hpp" +#include "usrp2_regs.hpp" +#include <uhd/utils/thread_priority.hpp> +#include <uhd/transport/convert_types.hpp> +#include <uhd/transport/alignment_buffer.hpp> +#include <boost/format.hpp> +#include <boost/asio.hpp> //htonl and ntohl +#include <boost/bind.hpp> +#include <boost/thread.hpp> +#include <iostream> + +using namespace uhd; +using namespace uhd::usrp; +using namespace uhd::transport; +namespace asio = boost::asio; + +static const int underflow_flags = async_metadata_t::EVENT_CODE_UNDERFLOW | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET; + +/*********************************************************************** + * io impl details (internal to this file) + * - pirate crew + * - alignment buffer + * - thread loop + * - vrt packet handler states + **********************************************************************/ +struct usrp2_impl::io_impl{ +    typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type; + +    io_impl(size_t num_frames, size_t width): +        packet_handler_recv_state(width), +        recv_pirate_booty(alignment_buffer_type::make(num_frames, width)), +        async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/)) +    { +        /* NOP */ +    } + +    ~io_impl(void){ +        recv_pirate_crew_raiding = false; +        recv_pirate_crew.interrupt_all(); +        recv_pirate_crew.join_all(); +    } + +    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, size_t timeout_ms){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(timeout_ms)); +    } + +    //state management for the vrt packet handler code +    vrt_packet_handler::recv_state packet_handler_recv_state; +    vrt_packet_handler::send_state packet_handler_send_state; + +    //methods and variables for the pirate crew +    void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t); +    boost::thread_group recv_pirate_crew; +    bool recv_pirate_crew_raiding; +    alignment_buffer_type::sptr recv_pirate_booty; +    bounded_buffer<async_metadata_t>::sptr async_msg_fifo; +}; + +/*********************************************************************** + * Receive Pirate Loop + * - while raiding, loot for recv buffers + * - put booty into the alignment buffer + **********************************************************************/ +void usrp2_impl::io_impl::recv_pirate_loop( +    zero_copy_if::sptr zc_if, +    usrp2_mboard_impl::sptr mboard, +    size_t index +){ +    set_thread_priority_safe(); +    recv_pirate_crew_raiding = true; +    size_t next_packet_seq = 0; + +    while(recv_pirate_crew_raiding){ +        managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); +        if (not buff.get()) continue; //ignore timeout/error buffers + +        try{ +            //extract the vrt header packet info +            vrt::if_packet_info_t if_packet_info; +            if_packet_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t); +            const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>(); +            vrt::if_hdr_unpack_be(vrt_hdr, if_packet_info); + +            //handle a tx async report message +            if (if_packet_info.sid == 1 and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ + +                //fill in the async metadata +                async_metadata_t metadata; +                metadata.channel = index; +                metadata.has_time_spec = if_packet_info.has_tsi and if_packet_info.has_tsf; +                metadata.time_spec = time_spec_t( +                    time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq() +                ); +                metadata.event_code = vrt_packet_handler::get_context_code<async_metadata_t::event_code_t>(vrt_hdr, if_packet_info); + +                //print the famous U, and push the metadata into the message queue +                if (metadata.event_code & underflow_flags) std::cerr << "U"; +                async_msg_fifo->push_with_pop_on_full(metadata); +                continue; +            } + +            //handle the packet count / sequence number +            if (if_packet_info.packet_count != next_packet_seq){ +                //std::cerr << "S" << (if_packet_info.packet_count - next_packet_seq)%16; +                std::cerr << "O"; //report overflow (drops in the kernel) +            } +            next_packet_seq = (if_packet_info.packet_count+1)%16; + +            //extract the timespec and round to the nearest packet +            UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf); +            time_spec_t time( +                time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq() +            ); + +            //push the packet into the buffer with the new time +            recv_pirate_booty->push_with_pop_on_full(buff, time, index); +        }catch(const std::exception &e){ +            std::cerr << "Error (usrp2 recv pirate loop): " << e.what() << std::endl; +        } +    } +} + +/*********************************************************************** + * Helper Functions + **********************************************************************/ +void usrp2_impl::io_init(void){ +    //send a small data packet so the usrp2 knows the udp source port +    BOOST_FOREACH(zero_copy_if::sptr data_transport, _data_transports){ +        managed_send_buffer::sptr send_buff = data_transport->get_send_buff(); +        static const boost::uint32_t data = htonl(USRP2_INVALID_VRT_HEADER); +        std::memcpy(send_buff->cast<void*>(), &data, sizeof(data)); +        send_buff->commit(sizeof(data)); +        //drain the recv buffers (may have junk) +        while (data_transport->get_recv_buff().get()); +    } + +    //the number of recv frames is the number for the first transport +    //the assumption is that all data transports should be identical +    size_t num_frames = _data_transports.front()->get_num_recv_frames(); + +    //create new io impl +    _io_impl = UHD_PIMPL_MAKE(io_impl, (num_frames, _data_transports.size())); + +    //create a new pirate thread for each zc if (yarr!!) +    for (size_t i = 0; i < _data_transports.size(); i++){ +        _io_impl->recv_pirate_crew.create_thread(boost::bind( +            &usrp2_impl::io_impl::recv_pirate_loop, +            _io_impl.get(), _data_transports.at(i), +            _mboards.at(i), i +        )); +    } + +    std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl; +    std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl; +    std::cout << "Recv pirate num frames: " << num_frames << std::endl; +} + +/*********************************************************************** + * Async Data + **********************************************************************/ +bool usrp2_impl::recv_async_msg( +    async_metadata_t &async_metadata, size_t timeout_ms +){ +    boost::this_thread::disable_interruption di; //disable because the wait can throw +    return _io_impl->async_msg_fifo->pop_with_timed_wait( +        async_metadata, boost::posix_time::milliseconds(timeout_ms) +    ); +} + +/*********************************************************************** + * Send Data + **********************************************************************/ +bool get_send_buffs( +    const std::vector<udp_zero_copy::sptr> &trans, +    vrt_packet_handler::managed_send_buffs_t &buffs +){ +    UHD_ASSERT_THROW(trans.size() == buffs.size()); +    for (size_t i = 0; i < buffs.size(); i++){ +        buffs[i] = trans[i]->get_send_buff(); +    } +    return true; +} + +size_t usrp2_impl::send( +    const std::vector<const void *> &buffs, size_t num_samps, +    const tx_metadata_t &metadata, const io_type_t &io_type, +    send_mode_t send_mode +){ +    return vrt_packet_handler::send( +        _io_impl->packet_handler_send_state,       //last state of the send handler +        buffs, num_samps,                          //buffer to fill +        metadata, send_mode,                       //samples metadata +        io_type, _io_helper.get_tx_otw_type(),     //input and output types to convert +        _mboards.front()->get_master_clock_freq(), //master clock tick rate +        uhd::transport::vrt::if_hdr_pack_be, +        boost::bind(&get_send_buffs, _data_transports, _1), +        get_max_send_samps_per_packet() +    ); +} + +/*********************************************************************** + * Receive Data + **********************************************************************/ +size_t usrp2_impl::recv( +    const std::vector<void *> &buffs, size_t num_samps, +    rx_metadata_t &metadata, const io_type_t &io_type, +    recv_mode_t recv_mode, size_t timeout_ms +){ +    return vrt_packet_handler::recv( +        _io_impl->packet_handler_recv_state,       //last state of the recv handler +        buffs, num_samps,                          //buffer to fill +        metadata, recv_mode,                       //samples metadata +        io_type, _io_helper.get_rx_otw_type(),     //input and output types to convert +        _mboards.front()->get_master_clock_freq(), //master clock tick rate +        uhd::transport::vrt::if_hdr_unpack_be, +        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout_ms) +    ); +} | 
