diff options
Diffstat (limited to 'host/lib/usrp/usrp_e100')
| -rw-r--r-- | host/lib/usrp/usrp_e100/io_impl.cpp | 48 | ||||
| -rw-r--r-- | host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp | 160 | 
2 files changed, 142 insertions, 66 deletions
diff --git a/host/lib/usrp/usrp_e100/io_impl.cpp b/host/lib/usrp/usrp_e100/io_impl.cpp index 5fb2da7b8..fc6aaeaee 100644 --- a/host/lib/usrp/usrp_e100/io_impl.cpp +++ b/host/lib/usrp/usrp_e100/io_impl.cpp @@ -48,13 +48,10 @@ static const bool recv_debug = false;   * - vrt packet handler states   **********************************************************************/  struct usrp_e100_impl::io_impl{ -    //state management for the vrt packet handler code -    vrt_packet_handler::recv_state packet_handler_recv_state; -    vrt_packet_handler::send_state packet_handler_send_state; -    zero_copy_if::sptr data_xport; -    bool continuous_streaming;      io_impl(usrp_e100_iface::sptr iface):          data_xport(usrp_e100_make_mmap_zero_copy(iface)), +        get_recv_buffs_fcn(boost::bind(&usrp_e100_impl::io_impl::get_recv_buffs, this, _1)), +        get_send_buffs_fcn(boost::bind(&usrp_e100_impl::io_impl::get_send_buffs, this, _1)),          recv_pirate_booty(data_xport->get_num_recv_frames()),          async_msg_fifo(100/*messages deep*/)      { @@ -67,12 +64,34 @@ struct usrp_e100_impl::io_impl{          recv_pirate_crew.join_all();      } -    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){ +    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs){          UHD_ASSERT_THROW(buffs.size() == 1);          boost::this_thread::disable_interruption di; //disable because the wait can throw -        return recv_pirate_booty.pop_with_timed_wait(buffs.front(), timeout); +        return recv_pirate_booty.pop_with_timed_wait(buffs.front(), recv_timeout);      } +    bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &buffs){ +        UHD_ASSERT_THROW(buffs.size() == 1); +        buffs[0] = data_xport->get_send_buff(send_timeout); +        return buffs[0].get() != NULL; +    } + +    //The data transport is listed first so that it is deconstructed last, +    //which is after the states and booty which may hold managed buffers. +    zero_copy_if::sptr data_xport; + +    //bound callbacks for get buffs (bound once here, not in fast-path) +    vrt_packet_handler::get_recv_buffs_t get_recv_buffs_fcn; +    vrt_packet_handler::get_send_buffs_t get_send_buffs_fcn; + +    //timeouts set on calls to recv/send (passed into get buffs methods) +    double recv_timeout, send_timeout; + +    //state management for the vrt packet handler code +    vrt_packet_handler::recv_state packet_handler_recv_state; +    vrt_packet_handler::send_state packet_handler_send_state; +    bool continuous_streaming; +      //a pirate's life is the life for me!      void recv_pirate_loop(usrp_e100_clock_ctrl::sptr);      bounded_buffer<managed_recv_buffer::sptr> recv_pirate_booty; @@ -204,15 +223,6 @@ void usrp_e100_impl::handle_overrun(size_t){  /***********************************************************************   * Data Send   **********************************************************************/ -bool get_send_buffs( -    zero_copy_if::sptr trans, double timeout, -    vrt_packet_handler::managed_send_buffs_t &buffs -){ -    UHD_ASSERT_THROW(buffs.size() == 1); -    buffs[0] = trans->get_send_buff(timeout); -    return buffs[0].get() != NULL; -} -  size_t usrp_e100_impl::get_max_send_samps_per_packet(void) const{      static const size_t hdr_size = 0          + vrt::max_if_hdr_words32*sizeof(boost::uint32_t) @@ -227,6 +237,7 @@ size_t usrp_e100_impl::send(      const tx_metadata_t &metadata, const io_type_t &io_type,      send_mode_t send_mode, double timeout  ){ +    _io_impl->send_timeout = timeout;      return vrt_packet_handler::send(          _io_impl->packet_handler_send_state,       //last state of the send handler          buffs, num_samps,                          //buffer to fill @@ -234,7 +245,7 @@ size_t usrp_e100_impl::send(          io_type, _send_otw_type,                   //input and output types to convert          _clock_ctrl->get_fpga_clock_rate(),        //master clock tick rate          uhd::transport::vrt::if_hdr_pack_le, -        boost::bind(&get_send_buffs, _io_impl->data_xport, timeout, _1), +        _io_impl->get_send_buffs_fcn,          get_max_send_samps_per_packet()      );  } @@ -257,6 +268,7 @@ size_t usrp_e100_impl::recv(      rx_metadata_t &metadata, const io_type_t &io_type,      recv_mode_t recv_mode, double timeout  ){ +    _io_impl->recv_timeout = timeout;      return vrt_packet_handler::recv(          _io_impl->packet_handler_recv_state,       //last state of the recv handler          buffs, num_samps,                          //buffer to fill @@ -264,7 +276,7 @@ size_t usrp_e100_impl::recv(          io_type, _recv_otw_type,                   //input and output types to convert          _clock_ctrl->get_fpga_clock_rate(),        //master clock tick rate          uhd::transport::vrt::if_hdr_unpack_le, -        boost::bind(&usrp_e100_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout), +        _io_impl->get_recv_buffs_fcn,          boost::bind(&usrp_e100_impl::handle_overrun, this, _1)      );  } diff --git a/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp b/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp index 4e0137fdb..c155d426a 100644 --- a/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp +++ b/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp @@ -22,7 +22,7 @@  #include <sys/mman.h> //mmap  #include <unistd.h> //getpagesize  #include <poll.h> //poll -#include <boost/bind.hpp> +#include <vector>  #include <iostream>  using namespace uhd; @@ -33,6 +33,82 @@ static const bool sp_verbose = false; //slow-path verbose  static const size_t poll_breakout = 10; //how many poll timeouts constitute a full timeout  /*********************************************************************** + * Reusable managed receiver buffer: + *  - The buffer knows how to claim and release a frame. + **********************************************************************/ +class usrp_e100_mmap_zero_copy_mrb : public managed_recv_buffer{ +public: +    usrp_e100_mmap_zero_copy_mrb(void *mem, ring_buffer_info *info): +        _mem(mem), _info(info) { /* NOP */ } + +    void release(void){ +        if (_info->flags != RB_USER_PROCESS) return; +        if (fp_verbose) std::cout << "recv buff: release" << std::endl; +        _info->flags = RB_KERNEL; //release the frame +    } + +    bool ready(void){return _info->flags & RB_USER;} + +    sptr get_new(void){ +        if (fp_verbose) std::cout << "  make_recv_buff: " << get_size() << std::endl; +        _info->flags = RB_USER_PROCESS; //claim the frame +        return sptr(this, &usrp_e100_mmap_zero_copy_mrb::fake_deleter); +    } + +private: +    static void fake_deleter(void *obj){ +        static_cast<usrp_e100_mmap_zero_copy_mrb *>(obj)->release(); +    } + +    const void *get_buff(void) const{return _mem;} +    size_t get_size(void) const{return _info->len;} + +    void *_mem; +    ring_buffer_info *_info; +}; + +/*********************************************************************** + * Reusable managed send buffer: + *  - The buffer knows how to claim and release a frame. + **********************************************************************/ +class usrp_e100_mmap_zero_copy_msb : public managed_send_buffer{ +public: +    usrp_e100_mmap_zero_copy_msb(void *mem, ring_buffer_info *info, size_t len, int fd): +        _mem(mem), _info(info), _len(len), _fd(fd) { /* NOP */ } + +    void commit(size_t len){ +        if (_info->flags != RB_USER_PROCESS) return; +        if (fp_verbose) std::cout << "send buff: commit " << len << std::endl; +        _info->len = len; +        _info->flags = RB_USER; //release the frame +        if (::write(_fd, NULL, 0) < 0){ //notifies the kernel +            std::cerr << UHD_THROW_SITE_INFO("write error") << std::endl; +        } +    } + +    bool ready(void){return _info->flags & RB_KERNEL;} + +    sptr get_new(void){ +        if (fp_verbose) std::cout << "  make_send_buff: " << get_size() << std::endl; +        _info->flags = RB_USER_PROCESS; //claim the frame +        return sptr(this, &usrp_e100_mmap_zero_copy_msb::fake_deleter); +    } + +private: +    static void fake_deleter(void *obj){ +        static_cast<usrp_e100_mmap_zero_copy_msb *>(obj)->commit(0); +    } + +    void *get_buff(void) const{return _mem;} +    size_t get_size(void) const{return _len;} + +    void *_mem; +    ring_buffer_info *_info; +    size_t _len; +    int _fd; +}; + +/***********************************************************************   * The zero copy interface implementation   **********************************************************************/  class usrp_e100_mmap_zero_copy_impl : public zero_copy_if{ @@ -81,13 +157,32 @@ public:              std::cout << "send_buff_off: " << send_buff_off << std::endl;          } +        //pointers to sections in the mapped memory +        ring_buffer_info (*recv_info)[], (*send_info)[]; +        char *recv_buff, *send_buff; +          //set the internal pointers for info and buffers          typedef ring_buffer_info (*rbi_pta)[];          char *rb_ptr = reinterpret_cast<char *>(_mapped_mem); -        _recv_info = reinterpret_cast<rbi_pta>(rb_ptr + recv_info_off); -        _recv_buff = rb_ptr + recv_buff_off; -        _send_info = reinterpret_cast<rbi_pta>(rb_ptr + send_info_off); -        _send_buff = rb_ptr + send_buff_off; +        recv_info = reinterpret_cast<rbi_pta>(rb_ptr + recv_info_off); +        recv_buff = rb_ptr + recv_buff_off; +        send_info = reinterpret_cast<rbi_pta>(rb_ptr + send_info_off); +        send_buff = rb_ptr + send_buff_off; + +        //initialize the managed receive buffers +        for (size_t i = 0; i < get_num_recv_frames(); i++){ +            _mrb_pool.push_back(usrp_e100_mmap_zero_copy_mrb( +                recv_buff + get_recv_frame_size()*i, (*recv_info) + i +            )); +        } + +        //initialize the managed send buffers +        for (size_t i = 0; i < get_num_recv_frames(); i++){ +            _msb_pool.push_back(usrp_e100_mmap_zero_copy_msb( +                send_buff + get_send_frame_size()*i, (*send_info) + i, +                get_send_frame_size(), _fd +            )); +        }      }      ~usrp_e100_mmap_zero_copy_impl(void){ @@ -97,13 +192,10 @@ public:      managed_recv_buffer::sptr get_recv_buff(double timeout){          if (fp_verbose) std::cout << "get_recv_buff: " << _recv_index << std::endl; - -        //grab pointers to the info and buffer -        ring_buffer_info *info = (*_recv_info) + _recv_index; -        void *mem = _recv_buff + _frame_size*_recv_index; +        usrp_e100_mmap_zero_copy_mrb &mrb = _mrb_pool[_recv_index];          //poll/wait for a ready frame -        if (not (info->flags & RB_USER)){ +        if (not mrb.ready()){              for (size_t i = 0; i < poll_breakout; i++){                  pollfd pfd;                  pfd.fd = _fd; @@ -115,18 +207,11 @@ public:              return managed_recv_buffer::sptr(); //timed-out for real          } found_user_frame: -        //the process has claimed the frame -        info->flags = RB_USER_PROCESS; -          //increment the index for the next call -        if (++_recv_index == size_t(_rb_size.num_rx_frames)) _recv_index = 0; +        if (++_recv_index == get_num_recv_frames()) _recv_index = 0;          //return the managed buffer for this frame -        if (fp_verbose) std::cout << "  make_recv_buff: " << info->len << std::endl; -        return managed_recv_buffer::make_safe( -            mem, info->len, -            boost::bind(&usrp_e100_mmap_zero_copy_impl::release, this, info) -        ); +        return mrb.get_new();      }      size_t get_num_recv_frames(void) const{ @@ -139,13 +224,10 @@ public:      managed_send_buffer::sptr get_send_buff(double timeout){          if (fp_verbose) std::cout << "get_send_buff: " << _send_index << std::endl; - -        //grab pointers to the info and buffer -        ring_buffer_info *info = (*_send_info) + _send_index; -        void *mem = _send_buff + _frame_size*_send_index; +        usrp_e100_mmap_zero_copy_msb &msb = _msb_pool[_send_index];          //poll/wait for a ready frame -        if (not (info->flags & RB_KERNEL)){ +        if (not msb.ready()){              pollfd pfd;              pfd.fd = _fd;              pfd.events = POLLOUT; @@ -155,14 +237,10 @@ public:          }          //increment the index for the next call -        if (++_send_index == size_t(_rb_size.num_tx_frames)) _send_index = 0; +        if (++_send_index == get_num_send_frames()) _send_index = 0;          //return the managed buffer for this frame -        if (fp_verbose) std::cout << "  make_send_buff: " << _frame_size << std::endl; -        return managed_send_buffer::make_safe( -            mem, _frame_size, -            boost::bind(&usrp_e100_mmap_zero_copy_impl::commit, this, info, _1) -        ); +        return msb.get_new();      }      size_t get_num_send_frames(void) const{ @@ -174,21 +252,7 @@ public:      }  private: - -    void release(ring_buffer_info *info){ -        if (fp_verbose) std::cout << "recv buff: release" << std::endl; -        info->flags = RB_KERNEL; -    } - -    void commit(ring_buffer_info *info, size_t len){ -        if (fp_verbose) std::cout << "send buff: commit " << len << std::endl; -        info->len = len; -        info->flags = RB_USER; -        if (::write(_fd, NULL, 0) < 0){ -            std::cerr << UHD_THROW_SITE_INFO("write error") << std::endl; -        } -    } - +    //file descriptor for mmap      int _fd;      //the mapped memory itself @@ -198,9 +262,9 @@ private:      usrp_e_ring_buffer_size_t _rb_size;      size_t _frame_size, _map_size; -    //pointers to sections in the mapped memory -    ring_buffer_info (*_recv_info)[], (*_send_info)[]; -    char *_recv_buff, *_send_buff; +    //re-usable managed buffers +    std::vector<usrp_e100_mmap_zero_copy_mrb> _mrb_pool; +    std::vector<usrp_e100_mmap_zero_copy_msb> _msb_pool;      //indexes into sub-sections of mapped memory      size_t _recv_index, _send_index;  | 
