diff options
| -rw-r--r-- | host/include/uhd/device.hpp | 11 | ||||
| -rw-r--r-- | host/include/uhd/types/otw_type.hpp | 6 | ||||
| -rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 280 | ||||
| -rw-r--r-- | host/lib/types.cpp | 4 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 48 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 7 | 
6 files changed, 232 insertions, 124 deletions
| diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp index ae75e6dc8..ecbe8c95c 100644 --- a/host/include/uhd/device.hpp +++ b/host/include/uhd/device.hpp @@ -80,12 +80,11 @@ public:       * Send a buffer containing IF data with its metadata.       *       * Send handles fragmentation as follows: -     * If the buffer has more samples than the maximum supported, -     * the send method will send the maximum number of samples -     * as supported by the transport and return the number sent. -     * In this case, the end of burst flag will be forced to false. -     * It is up to the caller to call send again on the un-sent -     * portions of the buffer, until the buffer is exhausted. +     * If the buffer has more samples than the maximum per packet, +     * the send method will fragment the samples across several packets. +     * Send will respect the burst flags when fragmenting to ensure +     * that start of burst can only be set on the first fragment and +     * that end of burst can only be set on the final fragment.       *       * This is a blocking call and will not return until the number       * of samples returned have been read out of the buffer. diff --git a/host/include/uhd/types/otw_type.hpp b/host/include/uhd/types/otw_type.hpp index f10664584..8e3e65d78 100644 --- a/host/include/uhd/types/otw_type.hpp +++ b/host/include/uhd/types/otw_type.hpp @@ -55,6 +55,12 @@ namespace uhd{              BO_NOT_APPLICABLE = '|'          } byteorder; +        /*! +         * Get the sample size of this otw type. +         * \return the size of a sample in bytes +         */ +        size_t get_sample_size(void) const; +          otw_type_t(void);      }; diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index a6161ef55..cd0c89b05 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -57,6 +57,49 @@ namespace vrt_packet_handler{          /* NOP */      } +    /******************************************************************* +     * Unpack a received vrt header and set the copy buffer. +     *  - helper function for vrt_packet_handler::recv +     ******************************************************************/ +    static inline void _recv_helper( +        recv_state &state, +        uhd::rx_metadata_t &metadata, +        double tick_rate, +        size_t vrt_header_offset_words32 +    ){ +        size_t num_packet_words32 = state.managed_buff->size()/sizeof(boost::uint32_t); +        if (num_packet_words32 <= vrt_header_offset_words32){ +            state.copy_buff = boost::asio::buffer("", 0); +            return; //must exit here after setting the buffer +        } +        const boost::uint32_t *vrt_hdr = state.managed_buff->cast<const boost::uint32_t *>() + vrt_header_offset_words32; +        size_t num_header_words32_out, num_payload_words32_out, packet_count_out; +        uhd::transport::vrt::unpack( +            metadata,                //output +            vrt_hdr,                 //input +            num_header_words32_out,  //output +            num_payload_words32_out, //output +            num_packet_words32,      //input +            packet_count_out,        //output +            tick_rate +        ); + +        //handle the packet count / sequence number +        if (packet_count_out != state.next_packet_seq){ +            std::cerr << "S" << (packet_count_out - state.next_packet_seq)%16; +        } +        state.next_packet_seq = (packet_count_out+1)%16; + +        //setup the buffer to point to the data +        state.copy_buff = boost::asio::buffer( +            vrt_hdr + num_header_words32_out, +            num_payload_words32_out*sizeof(boost::uint32_t) +        ); +    } + +    /******************************************************************* +     * Recv vrt packets and copy convert the samples into the buffer. +     ******************************************************************/      static inline size_t recv(          recv_state &state,          const boost::asio::mutable_buffer &buff, @@ -65,95 +108,184 @@ namespace vrt_packet_handler{          const uhd::otw_type_t &otw_type,          double tick_rate,          uhd::transport::zero_copy_if::sptr zc_iface, +        //use these two params to handle a layer above vrt          size_t vrt_header_offset_words32 = 0,          const recv_cb_t& recv_cb = &recv_cb_nop      ){ -        //////////////////////////////////////////////////////////////// -        // Perform the recv -        //////////////////////////////////////////////////////////////// -        { -            //perform a receive if no rx data is waiting to be copied -            if (boost::asio::buffer_size(state.copy_buff) == 0){ -                state.fragment_offset_in_samps = 0; -                state.managed_buff = zc_iface->get_recv_buff(); -                recv_cb(state.managed_buff); -            } -            //otherwise flag the metadata to show that is is a fragment -            else{ -                metadata = uhd::rx_metadata_t(); -                goto vrt_recv_copy_convert; -            } -        } +        metadata = uhd::rx_metadata_t(); //init the metadata -        //////////////////////////////////////////////////////////////// -        // Unpack the vrt header -        //////////////////////////////////////////////////////////////// -        { -            size_t num_packet_words32 = state.managed_buff->size()/sizeof(boost::uint32_t); -            if (num_packet_words32 <= vrt_header_offset_words32){ -                state.copy_buff = boost::asio::buffer("", 0); -                goto vrt_recv_copy_convert; //must exit here after setting the buffer -            } -            const boost::uint32_t *vrt_hdr = state.managed_buff->cast<const boost::uint32_t *>() + vrt_header_offset_words32; -            size_t num_header_words32_out, num_payload_words32_out, packet_count_out; -            uhd::transport::vrt::unpack( -                metadata,                //output -                vrt_hdr,                 //input -                num_header_words32_out,  //output -                num_payload_words32_out, //output -                num_packet_words32,      //input -                packet_count_out,        //output -                tick_rate +        //perform a receive if no rx data is waiting to be copied +        if (boost::asio::buffer_size(state.copy_buff) == 0){ +            state.fragment_offset_in_samps = 0; +            state.managed_buff = zc_iface->get_recv_buff(); +            recv_cb(state.managed_buff); //callback before vrt unpack +            _recv_helper( +                state, metadata, tick_rate, vrt_header_offset_words32              ); +        } -            //handle the packet count / sequence number -            if (packet_count_out != state.next_packet_seq){ -                std::cerr << "S" << (packet_count_out - state.next_packet_seq)%16; -            } -            state.next_packet_seq = (packet_count_out+1)%16; +        //extract the number of samples available to copy +        size_t bytes_per_item = otw_type.get_sample_size(); +        size_t bytes_available = boost::asio::buffer_size(state.copy_buff); +        size_t num_samps = std::min( +            boost::asio::buffer_size(buff)/io_type.size, +            bytes_available/bytes_per_item +        ); -            //setup the buffer to point to the data -            state.copy_buff = boost::asio::buffer( -                vrt_hdr + num_header_words32_out, -                num_payload_words32_out*sizeof(boost::uint32_t) -            ); +        //setup the fragment flags and offset +        metadata.more_fragments = boost::asio::buffer_size(buff)/io_type.size < num_samps; +        metadata.fragment_offset = state.fragment_offset_in_samps; +        state.fragment_offset_in_samps += num_samps; //set for next call + +        //copy-convert the samples from the recv buffer +        uhd::transport::convert_otw_type_to_io_type( +            boost::asio::buffer_cast<const void*>(state.copy_buff), otw_type, +            boost::asio::buffer_cast<void*>(buff), io_type, +            num_samps +        ); + +        //update the rx copy buffer to reflect the bytes copied +        size_t bytes_copied = num_samps*bytes_per_item; +        state.copy_buff = boost::asio::buffer( +            boost::asio::buffer_cast<const boost::uint8_t*>(state.copy_buff) + bytes_copied, +            bytes_available - bytes_copied +        ); + +        return num_samps; +    } + +/*********************************************************************** + * vrt packet handler for send + **********************************************************************/ +    struct send_state{ +        //init the expected seq number +        size_t next_packet_seq; + +        send_state(void){ +            next_packet_seq = 0;          } +    }; -        //////////////////////////////////////////////////////////////// -        // Perform copy-convert into buffer -        //////////////////////////////////////////////////////////////// -        vrt_recv_copy_convert:{ -            size_t bytes_per_item = (otw_type.width * 2) / 8; - -            //extract the number of samples available to copy -            //and a pointer into the usrp2 received items memory -            size_t bytes_available = boost::asio::buffer_size(state.copy_buff); -            size_t num_samps = std::min( -                boost::asio::buffer_size(buff)/io_type.size, -                bytes_available/bytes_per_item -            ); +    typedef boost::function<void(uhd::transport::managed_send_buffer::sptr)> send_cb_t; -            //setup the fragment flags and offset -            metadata.more_fragments = boost::asio::buffer_size(buff)/io_type.size < num_samps; -            metadata.fragment_offset = state.fragment_offset_in_samps; -            state.fragment_offset_in_samps += num_samps; //set for next time +    static inline void send_cb_nop(uhd::transport::managed_send_buffer::sptr){ +        /* NOP */ +    } -            //copy-convert the samples from the recv buffer -            uhd::transport::convert_otw_type_to_io_type( -                boost::asio::buffer_cast<const void*>(state.copy_buff), otw_type, -                boost::asio::buffer_cast<void*>(buff), io_type, -                num_samps -            ); +    /******************************************************************* +     * Pack a vrt header, copy-convert the data, and send it. +     *  - helper function for vrt_packet_handler::send +     ******************************************************************/ +    static inline void _send_helper( +        send_state &state, +        const void *send_mem, +        size_t num_samps, +        const uhd::tx_metadata_t &metadata, +        const uhd::io_type_t &io_type, +        const uhd::otw_type_t &otw_type, +        double tick_rate, +        uhd::transport::zero_copy_if::sptr zc_iface, +        size_t vrt_header_offset_words32, +        const send_cb_t& send_cb +    ){ +        //get a new managed send buffer +        uhd::transport::managed_send_buffer::sptr send_buff = zc_iface->get_send_buff(); +        boost::uint32_t *tx_mem = send_buff->cast<boost::uint32_t *>() + vrt_header_offset_words32; + +        size_t num_header_words32, num_packet_words32; +        size_t packet_count = state.next_packet_seq++; -            //update the rx copy buffer to reflect the bytes copied -            size_t bytes_copied = num_samps*bytes_per_item; -            state.copy_buff = boost::asio::buffer( -                boost::asio::buffer_cast<const boost::uint8_t*>(state.copy_buff) + bytes_copied, -                bytes_available - bytes_copied +        //pack metadata into a vrt header +        uhd::transport::vrt::pack( +            metadata,            //input +            tx_mem,              //output +            num_header_words32,  //output +            num_samps,           //input +            num_packet_words32,  //output +            packet_count,        //input +            tick_rate +        ); + +        //copy-convert the samples into the send buffer +        uhd::transport::convert_io_type_to_otw_type( +            send_mem, io_type, +            tx_mem + num_header_words32, otw_type, +            num_samps +        ); + +        send_cb(send_buff); //callback after memory filled + +        //commit the samples to the zero-copy interface +        send_buff->done(num_packet_words32*sizeof(boost::uint32_t)); +    } + +    /******************************************************************* +     * Send vrt packets and copy convert the samples into the buffer. +     ******************************************************************/ +    static inline size_t send( +        send_state &state, +        const boost::asio::const_buffer &buff, +        const uhd::tx_metadata_t &metadata, +        const uhd::io_type_t &io_type, +        const uhd::otw_type_t &otw_type, +        double tick_rate, +        uhd::transport::zero_copy_if::sptr zc_iface, +        size_t max_samples_per_packet, +        //use these two params to handle a layer above vrt +        size_t vrt_header_offset_words32 = 0, +        const send_cb_t& send_cb = &send_cb_nop +    ){ + +        size_t total_num_samps = boost::asio::buffer_size(buff)/io_type.size; + +        //special case: no fragmentation needed +        if (total_num_samps <= max_samples_per_packet){ +            _send_helper( +                state, +                boost::asio::buffer_cast<const void *>(buff), +                total_num_samps, +                metadata, +                io_type, otw_type, +                tick_rate, +                zc_iface, +                vrt_header_offset_words32, +                send_cb              ); +            return total_num_samps; +        } -            return num_samps; +        //calculate constants for fragmentation +        const size_t final_packet_samps = total_num_samps%max_samples_per_packet; +        const size_t num_fragments = (total_num_samps+max_samples_per_packet-1)/max_samples_per_packet; +        static const size_t first_fragment_index = 0; +        const size_t final_fragment_index = num_fragments-1; + +        //make a rw copy of the metadata to re-flag below +        uhd::tx_metadata_t md(metadata); + +        //loop through the following fragment indexes +        for (size_t n = first_fragment_index; n <= final_fragment_index; n++){ + +            //calculate new flags for the fragments +            md.has_time_spec  = md.has_time_spec  and (n == first_fragment_index); +            md.start_of_burst = md.start_of_burst and (n == first_fragment_index); +            md.end_of_burst   = md.end_of_burst   and (n == final_fragment_index); + +            //send the fragment with the helper function +            _send_helper( +                state, +                boost::asio::buffer_cast<const boost::uint8_t *>(buff) + (n*max_samples_per_packet*io_type.size), +                (n == final_fragment_index)?final_packet_samps:max_samples_per_packet, +                md, +                io_type, otw_type, +                tick_rate, +                zc_iface, +                vrt_header_offset_words32, +                send_cb +            );          } + +        return total_num_samps;      }  } //namespace vrt_packet_handler diff --git a/host/lib/types.cpp b/host/lib/types.cpp index 33eb550a1..87a2ad4ab 100644 --- a/host/lib/types.cpp +++ b/host/lib/types.cpp @@ -220,6 +220,10 @@ std::string mac_addr_t::to_string(void) const{  /***********************************************************************   * otw type   **********************************************************************/ +size_t otw_type_t::get_sample_size(void) const{ +    return (this->width * 2) / 8; +} +  otw_type_t::otw_type_t(void){      width = 0;      shift = 0; diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 389460fe8..b26dbb8bd 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -67,49 +67,17 @@ void usrp2_impl::io_init(void){   **********************************************************************/  size_t usrp2_impl::send(      const asio::const_buffer &buff, -    const tx_metadata_t &metadata_, +    const tx_metadata_t &metadata,      const io_type_t &io_type  ){ -    tx_metadata_t metadata = metadata_; //rw copy to change later - -    transport::managed_send_buffer::sptr send_buff = _data_transport->get_send_buff(); -    boost::uint32_t *tx_mem = send_buff->cast<boost::uint32_t *>(); -    size_t num_samps = std::min(std::min( -        asio::buffer_size(buff)/io_type.size, -        size_t(max_tx_samps_per_packet())), -        send_buff->size()/io_type.size -    ); - -    //kill the end of burst flag if this is a fragment -    if (asio::buffer_size(buff)/io_type.size < num_samps) -        metadata.end_of_burst = false; - -    size_t num_header_words32, num_packet_words32; -    size_t packet_count = _tx_stream_id_to_packet_seq[metadata.stream_id]++; - -    //pack metadata into a vrt header -    vrt::pack( -        metadata,            //input -        tx_mem,              //output -        num_header_words32,  //output -        num_samps,           //input -        num_packet_words32,  //output -        packet_count,        //input -        get_master_clock_freq() -    ); - -    boost::uint32_t *items = tx_mem + num_header_words32; //offset for data - -    //copy-convert the samples into the send buffer -    convert_io_type_to_otw_type( -        asio::buffer_cast<const void*>(buff), io_type, -        (void*)items, _tx_otw_type, -        num_samps +    return vrt_packet_handler::send( +        _packet_handler_send_state, //last state of the send handler +        buff, metadata,             //buffer to empty and samples metadata +        io_type, _tx_otw_type,      //input and output types to convert +        get_master_clock_freq(),    //master clock tick rate +        _data_transport,            //zero copy interface +        max_tx_samps_per_packet()      ); - -    //send and return number of samples -    send_buff->done(num_packet_words32*sizeof(boost::uint32_t)); -    return num_samps;  }  /*********************************************************************** diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 6958fc8ea..f98aa1938 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -122,8 +122,6 @@ private:      codec_ctrl::sptr _codec_ctrl;      serdes_ctrl::sptr _serdes_ctrl; -    uhd::dict<boost::uint32_t, size_t> _tx_stream_id_to_packet_seq; -      /*******************************************************************       * Deal with the rx and tx packet sizes       ******************************************************************/ @@ -139,13 +137,14 @@ private:          uhd::transport::vrt::max_header_words32*sizeof(boost::uint32_t)      ;      size_t max_rx_samps_per_packet(void){ -        return _max_rx_bytes_per_packet/(_rx_otw_type.width*2/8); +        return _max_rx_bytes_per_packet/_rx_otw_type.get_sample_size();      }      size_t max_tx_samps_per_packet(void){ -        return _max_tx_bytes_per_packet/(_tx_otw_type.width*2/8); +        return _max_tx_bytes_per_packet/_tx_otw_type.get_sample_size();      }      vrt_packet_handler::recv_state _packet_handler_recv_state; +    vrt_packet_handler::send_state _packet_handler_send_state;      uhd::otw_type_t _rx_otw_type, _tx_otw_type;      void io_init(void); | 
