diff options
| author | Josh Blum <josh@joshknows.com> | 2010-12-22 17:45:25 -0800 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2010-12-22 17:45:25 -0800 | 
| commit | 434ee07c4ba1ce9faee8ecf3a309042ca424cbda (patch) | |
| tree | a286584d98a0550ad9e8196e828295e52a133c83 | |
| parent | 67e89717659605e4d8e0ddd26e4ccef4dec24eb2 (diff) | |
| parent | de45f2234ca7ce8a1efd79525323bef55f1f9d44 (diff) | |
| download | uhd-434ee07c4ba1ce9faee8ecf3a309042ca424cbda.tar.gz uhd-434ee07c4ba1ce9faee8ecf3a309042ca424cbda.tar.bz2 uhd-434ee07c4ba1ce9faee8ecf3a309042ca424cbda.zip | |
Merge branch 'udp_ports' into next
Conflicts:
	firmware/microblaze/apps/txrx_uhd.c
	host/lib/usrp/usrp2/mboard_impl.cpp
	host/lib/usrp/usrp2/usrp2_impl.cpp
	host/lib/usrp/usrp2/usrp2_impl.hpp
| -rw-r--r-- | firmware/microblaze/apps/txrx_uhd.c | 24 | ||||
| -rw-r--r-- | firmware/microblaze/usrp2/memory_map.h | 12 | ||||
| -rw-r--r-- | firmware/microblaze/usrp2p/memory_map.h | 12 | ||||
| -rw-r--r-- | host/docs/transport.rst | 2 | ||||
| -rw-r--r-- | host/include/uhd/transport/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.hpp | 69 | ||||
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.ipp | 144 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.ipp | 13 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 7 | ||||
| -rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 1 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/fw_common.h | 4 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 203 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/mboard_impl.cpp | 7 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.cpp | 28 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 6 | ||||
| -rw-r--r-- | host/test/buffer_test.cpp | 51 | 
16 files changed, 247 insertions, 338 deletions
| diff --git a/firmware/microblaze/apps/txrx_uhd.c b/firmware/microblaze/apps/txrx_uhd.c index 517b50e91..7dc44f09f 100644 --- a/firmware/microblaze/apps/txrx_uhd.c +++ b/firmware/microblaze/apps/txrx_uhd.c @@ -55,15 +55,26 @@ static void setup_network(void);  static eth_mac_addr_t fp_mac_addr_src, fp_mac_addr_dst;  extern struct socket_address fp_socket_src, fp_socket_dst; -void handle_udp_data_packet( +static void handle_udp_err0_packet( +    struct socket_address src, struct socket_address dst, +    unsigned char *payload, int payload_len +){ +    sr_udp_sm->err0_port = (((uint32_t)dst.port) << 16) | src.port; +    printf("Storing for async error path:\n"); +    printf("  source udp port: %d\n", dst.port); +    printf("  destination udp port: %d\n", src.port); +    newline(); +} + +static void handle_udp_data_packet(      struct socket_address src, struct socket_address dst,      unsigned char *payload, int payload_len  ){ -    //its a tiny payload, load the fast-path variables      fp_mac_addr_src = *ethernet_mac_addr();      arp_cache_lookup_mac(&src.addr, &fp_mac_addr_dst);      fp_socket_src = dst;      fp_socket_dst = src; +    sr_udp_sm->dsp0_port = (((uint32_t)dst.port) << 16) | src.port;      printf("Storing for fast path:\n");      printf("  source mac addr: ");      print_mac_addr(fp_mac_addr_src.addr); newline(); @@ -77,7 +88,7 @@ void handle_udp_data_packet(      printf("  destination udp port: %d\n", fp_socket_dst.port);      newline(); -    //setup network and vrt +    //setup network      setup_network();  } @@ -85,7 +96,7 @@ void handle_udp_data_packet(  #define OTW_GPIO_BANK_TO_NUM(bank) \      (((bank) == USRP2_DIR_RX)? (GPIO_RX_BANK) : (GPIO_TX_BANK)) -void handle_udp_ctrl_packet( +static void handle_udp_ctrl_packet(      struct socket_address src, struct socket_address dst,      unsigned char *payload, int payload_len  ){ @@ -319,8 +330,8 @@ static void setup_network(void){    sr_udp_sm->ip_hdr.checksum = UDP_SM_INS_IP_HDR_CHKSUM | (chksum & 0xffff);    //setup the udp header machine -  sr_udp_sm->udp_hdr.src_port = fp_socket_src.port; -  sr_udp_sm->udp_hdr.dst_port = fp_socket_dst.port; +  sr_udp_sm->udp_hdr.src_port = UDP_SM_INS_UDP_SRC_PORT; +  sr_udp_sm->udp_hdr.dst_port = UDP_SM_INS_UDP_DST_PORT;    sr_udp_sm->udp_hdr.length = UDP_SM_INS_UDP_LEN;    sr_udp_sm->udp_hdr.checksum = UDP_SM_LAST_WORD;		// zero UDP checksum  } @@ -354,6 +365,7 @@ main(void)    //2) register callbacks for udp ports we service    register_udp_listener(USRP2_UDP_CTRL_PORT, handle_udp_ctrl_packet);    register_udp_listener(USRP2_UDP_DATA_PORT, handle_udp_data_packet); +  register_udp_listener(USRP2_UDP_ERR0_PORT, handle_udp_err0_packet);    register_udp_listener(USRP2_UDP_UPDATE_PORT, handle_udp_fw_update_packet);    //3) set the routing mode to slave and send a garp diff --git a/firmware/microblaze/usrp2/memory_map.h b/firmware/microblaze/usrp2/memory_map.h index 23d96389f..a2de29cdb 100644 --- a/firmware/microblaze/usrp2/memory_map.h +++ b/firmware/microblaze/usrp2/memory_map.h @@ -330,11 +330,21 @@ typedef struct {      uint32_t length;      uint32_t checksum; //word 22    } udp_hdr; -  volatile uint32_t _pad[32-23]; +  volatile uint32_t _pad[1]; +  volatile uint32_t dsp0_port; +  volatile uint32_t err0_port; +  volatile uint32_t dsp1_port; +  volatile uint32_t err1_port;  } sr_udp_sm_t;  // control bits (all expect UDP_SM_LAST_WORD are mutually exclusive) +// Insert a UDP source port from the table +#define UDP_SM_INS_UDP_SRC_PORT     (1 << 21) + +// Insert a UDP dest port from the table +#define UDP_SM_INS_UDP_DST_PORT     (1 << 20) +  // This is the last word of the header  #define	UDP_SM_LAST_WORD		(1 << 19) diff --git a/firmware/microblaze/usrp2p/memory_map.h b/firmware/microblaze/usrp2p/memory_map.h index 2b5ae57be..6f5c577e6 100644 --- a/firmware/microblaze/usrp2p/memory_map.h +++ b/firmware/microblaze/usrp2p/memory_map.h @@ -323,11 +323,21 @@ typedef struct {      uint32_t length;      uint32_t checksum; //word 22    } udp_hdr; -  volatile uint32_t _pad[32-23]; +  volatile uint32_t _pad[1]; +  volatile uint32_t dsp0_port; +  volatile uint32_t err0_port; +  volatile uint32_t dsp1_port; +  volatile uint32_t err1_port;  } sr_udp_sm_t;  // control bits (all expect UDP_SM_LAST_WORD are mutually exclusive) +// Insert a UDP source port from the table +#define UDP_SM_INS_UDP_SRC_PORT     (1 << 21) + +// Insert a UDP dest port from the table +#define UDP_SM_INS_UDP_DST_PORT     (1 << 20) +  // This is the last word of the header  #define	UDP_SM_LAST_WORD		(1 << 19) diff --git a/host/docs/transport.rst b/host/docs/transport.rst index d9abd4923..018f909c1 100644 --- a/host/docs/transport.rst +++ b/host/docs/transport.rst @@ -36,7 +36,7 @@ The following parameters can be used to alter the transport's default behavior:  * **num_send_frames:** The number of send buffers to allocate  * **concurrency_hint:** The number of threads to run the IO service -**Note:** num_send_frames and concurrency_hint will not have an effect +**Note:** num_send_frames will not have an effect  as the asynchronous send implementation is currently disabled.  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt index 2c84c0724..ec3b7b113 100644 --- a/host/include/uhd/transport/CMakeLists.txt +++ b/host/include/uhd/transport/CMakeLists.txt @@ -17,8 +17,6 @@  INSTALL(FILES -    alignment_buffer.hpp -    alignment_buffer.ipp      bounded_buffer.hpp      bounded_buffer.ipp      convert_types.hpp diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp deleted file mode 100644 index f44a037f8..000000000 --- a/host/include/uhd/transport/alignment_buffer.hpp +++ /dev/null @@ -1,69 +0,0 @@ -// -// Copyright 2010 Ettus Research LLC -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program.  If not, see <http://www.gnu.org/licenses/>. -// - -#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP -#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP - -#include <uhd/config.hpp> -#include <uhd/transport/bounded_buffer.hpp> //time_duration_t -#include <boost/shared_ptr.hpp> -#include <vector> - -namespace uhd{ namespace transport{ - -    /*! -     * Implement a templated alignment buffer: -     * Used for aligning asynchronously pushed elements with matching ids. -     */ -    template <typename elem_type, typename seq_type> class alignment_buffer{ -    public: -        typedef boost::shared_ptr<alignment_buffer<elem_type, seq_type> > sptr; - -        /*! -         * Make a new alignment buffer object. -         * \param capacity the maximum elements per index -         * \param width the number of elements to align -         */ -        static sptr make(size_t capacity, size_t width); - -        /*! -         * Push an element with sequence id into the buffer at index. -         * \param elem the element to push -         * \param seq the sequence identifier -         * \param index the buffer index -         * \return true if the element fit without popping for space -         */ -        virtual bool push_with_pop_on_full( -            const elem_type &elem, const seq_type &seq, size_t index -        ) = 0; - -        /*! -         * Pop an aligned set of elements from this alignment buffer. -         * \param elems a collection to store the aligned elements -         * \param timeout the timeout in seconds -         * \return false when the operation times out -         */ -        virtual bool pop_elems_with_timed_wait( -            std::vector<elem_type> &elems, double timeout -        ) = 0; -    }; - -}} //namespace - -#include <uhd/transport/alignment_buffer.ipp> - -#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP */ diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp deleted file mode 100644 index 833b5d399..000000000 --- a/host/include/uhd/transport/alignment_buffer.ipp +++ /dev/null @@ -1,144 +0,0 @@ -// -// Copyright 2010 Ettus Research LLC -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program.  If not, see <http://www.gnu.org/licenses/>. -// - -#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP -#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP - -#include <uhd/transport/bounded_buffer.hpp> -#include <boost/thread/condition_variable.hpp> -#include <utility> - -namespace uhd{ namespace transport{ namespace{ /*anon*/ - -    /*! -     * Imlement a templated alignment buffer: -     * Used for aligning asynchronously pushed elements with matching ids. -     */ -    template <typename elem_type, typename seq_type> -    class alignment_buffer_impl : public alignment_buffer<elem_type, seq_type>{ -    public: - -        alignment_buffer_impl(size_t capacity, size_t width) : _last_seqs(width){ -            for (size_t i = 0; i < width; i++){ -                _buffs.push_back(bounded_buffer<buff_contents_type>::make(capacity)); -                _all_indexes.push_back(i); -            } -            _there_was_a_clear = false; -        } - -        UHD_INLINE bool push_with_pop_on_full( -            const elem_type &elem, const seq_type &seq, size_t index -        ){ -            //clear the buffer for this index if the seqs are mis-ordered -            if (seq < _last_seqs[index]){ -                _buffs[index]->clear(); -                _there_was_a_clear = true; -            } _last_seqs[index] = seq; -            return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq)); -        } - -        UHD_INLINE bool pop_elems_with_timed_wait( -            std::vector<elem_type> &elems, double timeout -        ){ -            boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout); -            buff_contents_type buff_contents_tmp; -            std::list<size_t> indexes_to_do(_all_indexes); - -            //do an initial pop to load an initial sequence id -            size_t index = indexes_to_do.front(); -            if (not _buffs[index]->pop_with_timed_wait( -                buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) -            )) return false; -            elems[index] = buff_contents_tmp.first; -            seq_type expected_seq_id = buff_contents_tmp.second; -            indexes_to_do.pop_front(); - -            //get an aligned set of elements from the buffers: -            while(indexes_to_do.size() != 0){ - -                //respond to a clear by starting from scratch -                if(_there_was_a_clear){ -                    _there_was_a_clear = false; -                    indexes_to_do = _all_indexes; -                    index = indexes_to_do.front(); -                    if (not _buffs[index]->pop_with_timed_wait( -                        buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) -                    )) return false; -                    elems[index] = buff_contents_tmp.first; -                    expected_seq_id = buff_contents_tmp.second; -                    indexes_to_do.pop_front(); -                } - -                //pop an element off for this index -                index = indexes_to_do.front(); -                if (not _buffs[index]->pop_with_timed_wait( -                    buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) -                )) return false; - -                //if the sequence id matches: -                //  store the popped element into the output, -                //  remove this index from the list and continue -                if (buff_contents_tmp.second == expected_seq_id){ -                    elems[index] = buff_contents_tmp.first; -                    indexes_to_do.pop_front(); -                    continue; -                } - -                //if the sequence id is older: -                //  continue with the same index to try again -                if (buff_contents_tmp.second < expected_seq_id){ -                    continue; -                } - -                //if the sequence id is newer: -                //  store the popped element into the output, -                //  add all other indexes back into the list -                if (buff_contents_tmp.second > expected_seq_id){ -                    elems[index] = buff_contents_tmp.first; -                    expected_seq_id = buff_contents_tmp.second; -                    indexes_to_do = _all_indexes; -                    indexes_to_do.remove(index); -                    continue; -                } -            } -            return true; -        } - -    private: -        //a vector of bounded buffers for each index -        typedef std::pair<elem_type, seq_type> buff_contents_type; -        std::vector<typename bounded_buffer<buff_contents_type>::sptr> _buffs; -        std::vector<seq_type> _last_seqs; -        std::list<size_t> _all_indexes; -        bool _there_was_a_clear; -    }; - -}}} //namespace - -namespace uhd{ namespace transport{ - -    template <typename elem_type, typename seq_type> -    typename alignment_buffer<elem_type, seq_type>::sptr -    alignment_buffer<elem_type, seq_type>::make(size_t capacity, size_t width){ -        return typename alignment_buffer<elem_type, seq_type>::sptr( -            new alignment_buffer_impl<elem_type, seq_type>(capacity, width) -        ); -    } - -}} //namespace - -#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP */ diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index edc7faa06..f7915d866 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -26,14 +26,6 @@  namespace uhd{ namespace transport{ namespace{ /*anon*/ -    static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ -        return boost::posix_time::microseconds(long(timeout*1e6)); -    } - -    static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ -        return 1e-6*time_dur.total_microseconds(); -    } -      template <typename elem_type>      class bounded_buffer_impl : public bounded_buffer<elem_type>{      public: @@ -127,6 +119,11 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              _buffer.pop_back();              return elem;          } + +        static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ +            return boost::posix_time::microseconds(long(timeout*1e6)); +        } +      };  }}} //namespace diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index c758fa894..bbd63836c 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -37,14 +37,15 @@ namespace asio = boost::asio;   **********************************************************************/  //Define this to the the boost async io calls to perform receive.  //Otherwise, get_recv_buff uses a blocking receive with timeout. -//#define USE_ASIO_ASYNC_RECV +#define USE_ASIO_ASYNC_RECV  //Define this to the the boost async io calls to perform send.  //Otherwise, the commit callback uses a blocking send.  //#define USE_ASIO_ASYNC_SEND -//enough buffering for half a second of samples at full rate on usrp2 -static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); +//By default, this buffer is sized insufficiently small. +//For peformance, this buffer should be 10s of megabytes. +static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(10e3);  //Large buffers cause more underflow at high rates.  //Perhaps this is due to the kernel scheduling, diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index 278bcfeaa..7f8d84308 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -91,6 +91,7 @@ template <typename T> UHD_INLINE T get_context_code(          //vrt unpack each managed buffer          uhd::transport::vrt::if_packet_info_t if_packet_info;          for (size_t i = 0; i < state.width; i++){ +            if (state.managed_buffs[i].get() == NULL) continue; //better have a message packet coming up...              //extract packet words and check thats its enough to move on              size_t num_packet_words32 = state.managed_buffs[i]->size()/sizeof(boost::uint32_t); diff --git a/host/lib/usrp/usrp2/fw_common.h b/host/lib/usrp/usrp2/fw_common.h index efbb4b954..ee7fc3882 100644 --- a/host/lib/usrp/usrp2/fw_common.h +++ b/host/lib/usrp/usrp2/fw_common.h @@ -42,7 +42,9 @@ extern "C" {  // udp ports for the usrp2 communication  // Dynamic and/or private ports: 49152-65535  #define USRP2_UDP_CTRL_PORT 49152 -#define USRP2_UDP_DATA_PORT 49153 +//#define USRP2_UDP_UPDATE_PORT 49154 +#define USRP2_UDP_DATA_PORT 49156 +#define USRP2_UDP_ERR0_PORT 49157  ////////////////////////////////////////////////////////////////////////  // I2C addresses diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index cbc0a0817..5a6c0983c 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -21,11 +21,13 @@  #include <uhd/utils/byteswap.hpp>  #include <uhd/utils/thread_priority.hpp>  #include <uhd/transport/convert_types.hpp> -#include <uhd/transport/alignment_buffer.hpp> +#include <uhd/transport/bounded_buffer.hpp>  #include <boost/format.hpp>  #include <boost/bind.hpp>  #include <boost/thread.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp>  #include <iostream> +#include <list>  using namespace uhd;  using namespace uhd::usrp; @@ -108,16 +110,24 @@ private:   * - vrt packet handler states   **********************************************************************/  struct usrp2_impl::io_impl{ -    typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type; -    io_impl(size_t num_recv_frames, size_t send_frame_size, size_t width): +    io_impl(size_t send_frame_size, size_t width):          packet_handler_recv_state(width), -        recv_pirate_booty(alignment_buffer_type::make(num_recv_frames-3, width)),          async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/))      { -        for (size_t i = 0; i < width; i++) fc_mons.push_back( -            flow_control_monitor::sptr(new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size)) -        ); +        for (size_t i = 0; i < width; i++){ +            fc_mons.push_back(flow_control_monitor::sptr( +                new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size) +            )); +            //init empty packet infos +            vrt::if_packet_info_t packet_info; +            packet_info.packet_count = 0xf; +            packet_info.has_tsi = true; +            packet_info.tsi = 0; +            packet_info.has_tsf = true; +            packet_info.tsf = 0; +            prev_infos.push_back(packet_info); +        }      }      ~io_impl(void){ @@ -126,11 +136,6 @@ struct usrp2_impl::io_impl{          recv_pirate_crew.join_all();      } -    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout); -    } -      bool get_send_buffs(          const std::vector<zero_copy_if::sptr> &trans,          vrt_packet_handler::managed_send_buffs_t &buffs, @@ -151,6 +156,15 @@ struct usrp2_impl::io_impl{          return true;      } +    bool get_recv_buffs( +        const std::vector<zero_copy_if::sptr> &xports, +        vrt_packet_handler::managed_recv_buffs_t &buffs, +        double timeout +    ); + +    //previous state for each buffer +    std::vector<vrt::if_packet_info_t> prev_infos; +      //flow control monitors      std::vector<flow_control_monitor::sptr> fc_mons; @@ -162,29 +176,28 @@ struct usrp2_impl::io_impl{      void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t);      boost::thread_group recv_pirate_crew;      bool recv_pirate_crew_raiding; -    alignment_buffer_type::sptr recv_pirate_booty;      bounded_buffer<async_metadata_t>::sptr async_msg_fifo;      boost::mutex spawn_mutex;  };  /***********************************************************************   * Receive Pirate Loop - * - while raiding, loot for recv buffers - * - put booty into the alignment buffer + * - while raiding, loot for message packet + * - update flow control condition count + * - put async message packets into queue   **********************************************************************/  void usrp2_impl::io_impl::recv_pirate_loop( -    zero_copy_if::sptr zc_if, +    zero_copy_if::sptr zc_if_err0,      usrp2_mboard_impl::sptr mboard,      size_t index  ){      set_thread_priority_safe();      recv_pirate_crew_raiding = true; -    size_t next_packet_seq = 0;      spawn_mutex.unlock();      while(recv_pirate_crew_raiding){ -        managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); +        managed_recv_buffer::sptr buff = zc_if_err0->get_recv_buff();          if (not buff.get()) continue; //ignore timeout/error buffers          try{ @@ -194,26 +207,6 @@ void usrp2_impl::io_impl::recv_pirate_loop(              const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();              vrt::if_hdr_unpack_be(vrt_hdr, if_packet_info); -            //handle the rx data stream -            if (if_packet_info.sid == usrp2_impl::RECV_SID){ -                //handle the packet count / sequence number -                if (if_packet_info.packet_count != next_packet_seq){ -                    //std::cerr << "S" << (if_packet_info.packet_count - next_packet_seq)%16; -                    std::cerr << "O" << std::flush; //report overflow (drops in the kernel) -                } -                next_packet_seq = (if_packet_info.packet_count+1)%16; - -                //extract the timespec and round to the nearest packet -                UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf); -                time_spec_t time( -                    time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq() -                ); - -                //push the packet into the buffer with the new time -                recv_pirate_booty->push_with_pop_on_full(buff, time, index); -                continue; -            } -              //handle a tx async report message              if (if_packet_info.sid == usrp2_impl::ASYNC_SID and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ @@ -253,11 +246,10 @@ void usrp2_impl::io_impl::recv_pirate_loop(  void usrp2_impl::io_init(void){      //the assumption is that all data transports should be identical -    const size_t num_recv_frames = _data_transports.front()->get_num_recv_frames();      const size_t send_frame_size = _data_transports.front()->get_send_frame_size();      //create new io impl -    _io_impl = UHD_PIMPL_MAKE(io_impl, (num_recv_frames, send_frame_size, _data_transports.size())); +    _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports.size()));      //TODO temporary fix for weird power up state, remove when FPGA fixed      { @@ -276,7 +268,7 @@ void usrp2_impl::io_init(void){          //spawn a new pirate to plunder the recv booty          _io_impl->recv_pirate_crew.create_thread(boost::bind(              &usrp2_impl::io_impl::recv_pirate_loop, -            _io_impl.get(), _data_transports.at(i), +            _io_impl.get(), _err0_transports.at(i),              _mboards.at(i), i          ));          //block here until the spawned thread unlocks @@ -328,6 +320,133 @@ size_t usrp2_impl::send(  }  /*********************************************************************** + * Alignment logic on receive + **********************************************************************/ +static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ +    return boost::posix_time::microseconds(long(timeout*1e6)); +} + +static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ +    return 1e-6*time_dur.total_microseconds(); +} + +static UHD_INLINE time_spec_t extract_time_spec( +    const vrt::if_packet_info_t &packet_info +){ +    return time_spec_t( //assumes has_tsi and has_tsf are true +        time_t(packet_info.tsi), size_t(packet_info.tsf), +        100e6 //tick rate does not have to be correct for comparison purposes +    ); +} + +static UHD_INLINE void extract_packet_info( +    managed_recv_buffer::sptr &buff, +    vrt::if_packet_info_t &prev_info, +    time_spec_t &time, bool &clear, bool &msg +){ +    //extract packet info +    vrt::if_packet_info_t next_info; +    next_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t); +    vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), next_info); + +    //handle the packet count / sequence number +    if ((prev_info.packet_count+1)%16 != next_info.packet_count){ +        std::cerr << "O" << std::flush; //report overflow (drops in the kernel) +    } + +    time = extract_time_spec(next_info); +    clear = extract_time_spec(prev_info) > time; +    msg = next_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA; +    prev_info = next_info; +} + +static UHD_INLINE bool handle_msg_packet( +    vrt_packet_handler::managed_recv_buffs_t &buffs, size_t index +){ +    for (size_t i = 0; i < buffs.size(); i++){ +        if (i == index) continue; +        buffs[i].reset(); //set NULL +    } +    return true; +} + +UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs( +    const std::vector<zero_copy_if::sptr> &xports, +    vrt_packet_handler::managed_recv_buffs_t &buffs, +    double timeout +){ +    if (buffs.size() == 1){ +        buffs[0] = xports[0]->get_recv_buff(timeout); +        if (buffs[0].get() == NULL) return false; +        bool clear, msg; time_spec_t time; //unused variables +        //call extract_packet_info to handle printing the overflows +        extract_packet_info(buffs[0], this->prev_infos[0], time, clear, msg); +        return true; +    } +    //-------------------- begin alignment logic ---------------------// +    boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout); +    managed_recv_buffer::sptr buff_tmp; +    std::list<size_t> _all_indexes, indexes_to_do; +    for (size_t i = 0; i < buffs.size(); i++) _all_indexes.push_back(i); +    bool clear, msg; +    time_spec_t expected_time; + +    //respond to a clear by starting from scratch +    got_clear: +    indexes_to_do = _all_indexes; +    clear = false; + +    //do an initial pop to load an initial sequence id +    size_t index = indexes_to_do.front(); +    buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); +    if (buff_tmp.get() == NULL) return false; +    extract_packet_info(buff_tmp, this->prev_infos[index], expected_time, clear, msg); +    if (clear) goto got_clear; +    buffs[index] = buff_tmp; +    if (msg) return handle_msg_packet(buffs, index); +    indexes_to_do.pop_front(); + +    //get an aligned set of elements from the buffers: +    while(indexes_to_do.size() != 0){ + +        //pop an element off for this index +        index = indexes_to_do.front(); +        buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); +        if (buff_tmp.get() == NULL) return false; +        time_spec_t this_time; +        extract_packet_info(buff_tmp, this->prev_infos[index], this_time, clear, msg); +        if (clear) goto got_clear; +        buffs[index] = buff_tmp; +        if (msg) return handle_msg_packet(buffs, index); + +        //if the sequence id matches: +        //  remove this index from the list and continue +        if (this_time == expected_time){ +            indexes_to_do.pop_front(); +            continue; +        } + +        //if the sequence id is older: +        //  continue with the same index to try again +        else if (this_time < expected_time){ +            continue; +        } + +        //if the sequence id is newer: +        //  use the new expected time for comparison +        //  add all other indexes back into the list +        else{ +            expected_time = this_time; +            indexes_to_do = _all_indexes; +            indexes_to_do.remove(index); +            continue; +        } +    } +    return true; +    //-------------------- end alignment logic -----------------------// +} + +/***********************************************************************   * Receive Data   **********************************************************************/  size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{ @@ -357,7 +476,7 @@ size_t usrp2_impl::recv(          io_type, _rx_otw_type,                     //input and output types to convert          _mboards.front()->get_master_clock_freq(), //master clock tick rate          uhd::transport::vrt::if_hdr_unpack_be, -        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout), +        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _data_transports, _1, timeout),          boost::bind(&handle_overflow, _mboards, _1)      );  } diff --git a/host/lib/usrp/usrp2/mboard_impl.cpp b/host/lib/usrp/usrp2/mboard_impl.cpp index b8ebd6030..72d1c9d03 100644 --- a/host/lib/usrp/usrp2/mboard_impl.cpp +++ b/host/lib/usrp/usrp2/mboard_impl.cpp @@ -42,6 +42,7 @@ usrp2_mboard_impl::usrp2_mboard_impl(      size_t index,      transport::udp_simple::sptr ctrl_transport,      transport::zero_copy_if::sptr data_transport, +    transport::zero_copy_if::sptr err0_transport,      const device_addr_t &device_args,      size_t recv_samps_per_packet  ): @@ -51,11 +52,15 @@ usrp2_mboard_impl::usrp2_mboard_impl(      //Send a small data packet so the usrp2 knows the udp source port.      //This setup must happen before further initialization occurs      //or the async update packets will cause ICMP destination unreachable. -    transport::managed_send_buffer::sptr send_buff = data_transport->get_send_buff(); +    transport::managed_send_buffer::sptr send_buff;      static const boost::uint32_t data[2] = {          uhd::htonx(boost::uint32_t(0 /* don't care seq num */)),          uhd::htonx(boost::uint32_t(USRP2_INVALID_VRT_HEADER))      }; +    send_buff = data_transport->get_send_buff(); +    std::memcpy(send_buff->cast<void*>(), &data, sizeof(data)); +    send_buff->commit(sizeof(data)); +    send_buff = err0_transport->get_send_buff();      std::memcpy(send_buff->cast<void*>(), &data, sizeof(data));      send_buff->commit(sizeof(data)); diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp index 133c39a35..f910999d4 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.cpp +++ b/host/lib/usrp/usrp2/usrp2_impl.cpp @@ -197,10 +197,18 @@ static device_addrs_t usrp2_find(const device_addr_t &hint_){   * Make   **********************************************************************/  static device::sptr usrp2_make(const device_addr_t &device_addr){ -sep_indexed_dev_addrs(device_addr); + +    //setup the dsp transport hints (default to a large recv buff) +    device_addr_t dsp_xport_hints = device_addr; +    if (not dsp_xport_hints.has_key("recv_buff_size")){ +        //set to half-a-second of buffering at max rate +        dsp_xport_hints["recv_buff_size"] = "50e6"; +    } +      //create a ctrl and data transport for each address      std::vector<udp_simple::sptr> ctrl_transports;      std::vector<zero_copy_if::sptr> data_transports; +    std::vector<zero_copy_if::sptr> err0_transports;      const device_addrs_t device_addrs = sep_indexed_dev_addrs(device_addr);      BOOST_FOREACH(const device_addr_t &dev_addr_i, device_addrs){ @@ -208,14 +216,17 @@ sep_indexed_dev_addrs(device_addr);              dev_addr_i["addr"], num2str(USRP2_UDP_CTRL_PORT)          ));          data_transports.push_back(udp_zero_copy::make( -            dev_addr_i["addr"], num2str(USRP2_UDP_DATA_PORT), device_addr +            dev_addr_i["addr"], num2str(USRP2_UDP_DATA_PORT), dsp_xport_hints +        )); +        err0_transports.push_back(udp_zero_copy::make( +            dev_addr_i["addr"], num2str(USRP2_UDP_ERR0_PORT), device_addr_t()          ));      }      //create the usrp2 implementation guts -    return device::sptr( -        new usrp2_impl(ctrl_transports, data_transports, device_addrs) -    ); +    return device::sptr(new usrp2_impl( +        ctrl_transports, data_transports, err0_transports, device_addrs +    ));  }  UHD_STATIC_BLOCK(register_usrp2_device){ @@ -228,9 +239,11 @@ UHD_STATIC_BLOCK(register_usrp2_device){  usrp2_impl::usrp2_impl(      std::vector<udp_simple::sptr> ctrl_transports,      std::vector<zero_copy_if::sptr> data_transports, +    std::vector<zero_copy_if::sptr> err0_transports,      const device_addrs_t &device_args  ): -    _data_transports(data_transports) +    _data_transports(data_transports), +    _err0_transports(err0_transports)  {      //setup rx otw type      _rx_otw_type.width = 16; @@ -247,7 +260,8 @@ usrp2_impl::usrp2_impl(      //create a new mboard handler for each control transport      for(size_t i = 0; i < device_args.size(); i++){          _mboards.push_back(usrp2_mboard_impl::sptr(new usrp2_mboard_impl( -            i, ctrl_transports[i], data_transports[i], device_args[i], +            i, ctrl_transports[i], data_transports[i], +            err0_transports[i], device_args[i],              this->get_max_recv_samps_per_packet()          )));          //use an empty name when there is only one mboard diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 85c00b079..9cd27ee41 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -85,6 +85,7 @@ public:          size_t index,          uhd::transport::udp_simple::sptr,          uhd::transport::zero_copy_if::sptr, +        uhd::transport::zero_copy_if::sptr,          const uhd::device_addr_t &device_args,          size_t recv_samps_per_packet      ); @@ -186,11 +187,13 @@ public:       * Create a new usrp2 impl base.       * \param ctrl_transports the udp transports for control       * \param data_transports the udp transports for data -     * \param flow_control_hints optional flow control params +     * \param err0_transports the udp transports for error +     * \param device_args optional misc device parameters       */      usrp2_impl(          std::vector<uhd::transport::udp_simple::sptr> ctrl_transports,          std::vector<uhd::transport::zero_copy_if::sptr> data_transports, +        std::vector<uhd::transport::zero_copy_if::sptr> err0_transports,          const uhd::device_addrs_t &device_args      ); @@ -222,6 +225,7 @@ private:      //io impl methods and members      std::vector<uhd::transport::zero_copy_if::sptr> _data_transports; +    std::vector<uhd::transport::zero_copy_if::sptr> _err0_transports;      uhd::otw_type_t _rx_otw_type, _tx_otw_type;      UHD_PIMPL_DECL(io_impl) _io_impl;      void io_init(void); diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp index 8445412e7..e7bc88699 100644 --- a/host/test/buffer_test.cpp +++ b/host/test/buffer_test.cpp @@ -17,7 +17,6 @@  #include <boost/test/unit_test.hpp>  #include <uhd/transport/bounded_buffer.hpp> -#include <uhd/transport/alignment_buffer.hpp>  #include <boost/assign/list_of.hpp>  using namespace boost::assign; @@ -63,53 +62,3 @@ BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_pop_on_full){      BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));      BOOST_CHECK_EQUAL(val, 3);  } - -BOOST_AUTO_TEST_CASE(test_alignment_buffer){ -    alignment_buffer<int, size_t>::sptr ab(alignment_buffer<int, size_t>::make(7, 3)); -    //load index 0 with all good seq numbers -    BOOST_CHECK(ab->push_with_pop_on_full(0, 0, 0)); -    BOOST_CHECK(ab->push_with_pop_on_full(1, 1, 0)); -    BOOST_CHECK(ab->push_with_pop_on_full(2, 2, 0)); -    BOOST_CHECK(ab->push_with_pop_on_full(3, 3, 0)); -    BOOST_CHECK(ab->push_with_pop_on_full(4, 4, 0)); - -    //load index 1 with some skipped seq numbers -    BOOST_CHECK(ab->push_with_pop_on_full(10, 0, 1)); -    BOOST_CHECK(ab->push_with_pop_on_full(11, 1, 1)); -    BOOST_CHECK(ab->push_with_pop_on_full(14, 4, 1)); -    BOOST_CHECK(ab->push_with_pop_on_full(15, 5, 1)); -    BOOST_CHECK(ab->push_with_pop_on_full(16, 6, 1)); - -    //load index 2 with all good seq numbers -    BOOST_CHECK(ab->push_with_pop_on_full(20, 0, 2)); -    BOOST_CHECK(ab->push_with_pop_on_full(21, 1, 2)); -    BOOST_CHECK(ab->push_with_pop_on_full(22, 2, 2)); -    BOOST_CHECK(ab->push_with_pop_on_full(23, 3, 2)); -    BOOST_CHECK(ab->push_with_pop_on_full(24, 4, 2)); - -    //readback aligned values -    std::vector<int> aligned_elems(3); - -    static const std::vector<int> expected_elems0 = list_of(0)(10)(20); -    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); -    BOOST_CHECK_EQUAL_COLLECTIONS( -        aligned_elems.begin(), aligned_elems.end(), -        expected_elems0.begin(), expected_elems0.end() -    ); - -    static const std::vector<int> expected_elems1 = list_of(1)(11)(21); -    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); -    BOOST_CHECK_EQUAL_COLLECTIONS( -        aligned_elems.begin(), aligned_elems.end(), -        expected_elems1.begin(), expected_elems1.end() -    ); - -    //there was a skip now find 4 - -    static const std::vector<int> expected_elems4 = list_of(4)(14)(24); -    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); -    BOOST_CHECK_EQUAL_COLLECTIONS( -        aligned_elems.begin(), aligned_elems.end(), -        expected_elems4.begin(), expected_elems4.end() -    ); -} | 
