diff options
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/device.cpp | 18 | ||||
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/transport/libusb1_base.cpp | 16 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 212 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 209 | ||||
-rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 53 | ||||
-rw-r--r-- | host/lib/transport/zero_copy.cpp | 121 | ||||
-rw-r--r-- | host/lib/usrp/dboard/db_tvrx.cpp | 4 | ||||
-rw-r--r-- | host/lib/usrp/usrp1/io_impl.cpp | 85 | ||||
-rw-r--r-- | host/lib/usrp/usrp1/usrp1_impl.cpp | 32 | ||||
-rw-r--r-- | host/lib/usrp/usrp1/usrp1_impl.hpp | 7 | ||||
-rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 36 | ||||
-rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 6 | ||||
-rw-r--r-- | host/lib/version.cpp | 14 |
14 files changed, 385 insertions, 430 deletions
diff --git a/host/lib/device.cpp b/host/lib/device.cpp index d575ebaab..386588a08 100644 --- a/host/lib/device.cpp +++ b/host/lib/device.cpp @@ -26,6 +26,7 @@ #include <boost/functional/hash.hpp> #include <boost/tuple/tuple.hpp> #include <stdexcept> +#include <iostream> using namespace uhd; @@ -73,12 +74,17 @@ device_addrs_t device::find(const device_addr_t &hint){ device_addrs_t device_addrs; BOOST_FOREACH(const dev_fcn_reg_t &fcn, get_dev_fcn_regs()){ - device_addrs_t discovered_addrs = fcn.get<0>()(hint); - device_addrs.insert( - device_addrs.begin(), - discovered_addrs.begin(), - discovered_addrs.end() - ); + try{ + device_addrs_t discovered_addrs = fcn.get<0>()(hint); + device_addrs.insert( + device_addrs.begin(), + discovered_addrs.begin(), + discovered_addrs.end() + ); + } + catch(const std::exception &e){ + std::cerr << "Device discovery error: " << e.what() << std::endl; + } } return device_addrs; diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index e9e82932c..eafdd57d2 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -36,6 +36,8 @@ IF(LIBUSB_FOUND) INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/lib/transport/msvc) ENDIF(MSVC) SET(HAVE_USB_SUPPORT TRUE) +ELSE(LIBUSB_FOUND) + #TODO dummy usb ENDIF(LIBUSB_FOUND) IF(HAVE_USB_SUPPORT) diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index 49f524a32..910b04fc8 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -213,15 +213,21 @@ private: libusb::device_handle::sptr libusb::device_handle::get_cached_handle(device::sptr dev){ static uhd::dict<libusb_device *, boost::weak_ptr<device_handle> > handles; - //not expired -> get existing session + //not expired -> get existing handle if (handles.has_key(dev->get()) and not handles[dev->get()].expired()){ return handles[dev->get()].lock(); } - //create a new global session - sptr new_handle(new libusb_device_handle_impl(dev)); - handles[dev->get()] = new_handle; - return new_handle; + //create a new cached handle + try{ + sptr new_handle(new libusb_device_handle_impl(dev)); + handles[dev->get()] = new_handle; + return new_handle; + } + catch(const std::exception &e){ + std::cerr << "USB open failed: see the application notes for your device." << std::endl; + throw std::runtime_error(e.what()); + } } /*********************************************************************** diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f9beb0b4c..ab48e4fc4 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -24,12 +24,10 @@ #include <boost/thread.hpp> #include <vector> #include <iostream> -#include <iomanip> using namespace uhd::transport; -const int libusb_timeout = 0; - +static const double CLEANUP_TIMEOUT = 0.2; //seconds static const size_t DEFAULT_NUM_XFERS = 16; //num xfers static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes @@ -84,10 +82,10 @@ public: * Get an available transfer: * For inputs, this is a just filled transfer. * For outputs, this is a just emptied transfer. - * \param timeout_ms the timeout to wait for a lut + * \param timeout the timeout to wait for a lut * \return the transfer pointer or NULL if timeout */ - libusb_transfer *get_lut_with_wait(size_t timeout_ms = 100); + libusb_transfer *get_lut_with_wait(double timeout); //Callback use only void callback_handle_transfer(libusb_transfer *lut); @@ -97,21 +95,18 @@ private: int _endpoint; bool _input; - size_t _transfer_size; - size_t _num_transfers; - //! hold a bounded buffer of completed transfers typedef bounded_buffer<libusb_transfer *> lut_buff_type; lut_buff_type::sptr _completed_list; //! a list of all transfer structs we allocated - std::vector<libusb_transfer *> _all_luts; + std::vector<libusb_transfer *> _all_luts; - //! a list of shared arrays for the transfer buffers - std::vector<boost::shared_array<boost::uint8_t> > _buffers; + //! a block of memory for the transfer buffers + boost::shared_array<char> _buffer; // Calls for processing asynchronous I/O - libusb_transfer *allocate_transfer(int buff_len); + libusb_transfer *allocate_transfer(void *mem, size_t len); void print_transfer_status(libusb_transfer *lut); }; @@ -156,14 +151,12 @@ usb_endpoint::usb_endpoint( ): _handle(handle), _endpoint(endpoint), - _input(input), - _transfer_size(transfer_size), - _num_transfers(num_transfers) + _input(input) { _completed_list = lut_buff_type::make(num_transfers); - - for (size_t i = 0; i < _num_transfers; i++){ - _all_luts.push_back(allocate_transfer(_transfer_size)); + _buffer = boost::shared_array<char>(new char[num_transfers*transfer_size]); + for (size_t i = 0; i < num_transfers; i++){ + _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size)); //input luts are immediately submitted to be filled //output luts go into the completed list as free buffers @@ -187,7 +180,7 @@ usb_endpoint::~usb_endpoint(void){ } //collect canceled transfers (drain the queue) - while (this->get_lut_with_wait() != NULL){}; + while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){}; //free all transfers BOOST_FOREACH(libusb_transfer *lut, _all_luts){ @@ -200,23 +193,23 @@ usb_endpoint::~usb_endpoint(void){ * Allocate a libusb transfer * The allocated transfer - and buffer it contains - is repeatedly * submitted, reaped, and reused and should not be freed until shutdown. - * \param buff_len size of the individual buffer held by each transfer + * \param mem a pointer to the buffer memory + * \param len size of the individual buffer * \return pointer to an allocated libusb_transfer */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){ +libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){ libusb_transfer *lut = libusb_alloc_transfer(0); - - boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]); - _buffers.push_back(buff); //store a reference to this shared array + UHD_ASSERT_THROW(lut != NULL); unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); + unsigned char *buff = reinterpret_cast<unsigned char *>(mem); libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback); libusb_fill_bulk_transfer(lut, // transfer _handle->get(), // dev_handle endpoint, // endpoint - buff.get(), // buffer - buff_len, // length + buff, // buffer + len, // length lut_callback, // callback this, // user_data 0); // timeout @@ -239,6 +232,7 @@ void usb_endpoint::submit(libusb_transfer *lut){ * \param lut pointer to an libusb_transfer */ void usb_endpoint::print_transfer_status(libusb_transfer *lut){ + std::cout << "here " << lut->status << std::endl; switch (lut->status) { case LIBUSB_TRANSFER_COMPLETED: if (lut->actual_length < lut->length) { @@ -274,136 +268,52 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut){ } } -libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){ +libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw libusb_transfer *lut; - if (_completed_list->pop_with_timed_wait( - lut, boost::posix_time::milliseconds(timeout_ms) - )) return lut; + if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut; return NULL; } /*********************************************************************** - * Managed buffers + * USB zero_copy device class **********************************************************************/ -/* - * Libusb managed receive buffer - * Construct a recv buffer from a libusb transfer. The memory held by - * the libusb transfer is exposed through the managed buffer interface. - * Upon destruction, the transfer and buffer are resubmitted to the - * endpoint for further use. - */ -class libusb_managed_recv_buffer_impl : public managed_recv_buffer { +class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> { public: - libusb_managed_recv_buffer_impl(libusb_transfer *lut, - usb_endpoint::sptr endpoint) - : _buff(lut->buffer, lut->length) - { - _lut = lut; - _endpoint = endpoint; - } - - ~libusb_managed_recv_buffer_impl(void){ - _endpoint->submit(_lut); - } + typedef boost::shared_ptr<libusb_zero_copy_impl> sptr; -private: - const boost::asio::const_buffer &get(void) const{ - return _buff; - } + libusb_zero_copy_impl( + libusb::device_handle::sptr handle, + unsigned int recv_endpoint, unsigned int send_endpoint, + size_t recv_xfer_size, size_t recv_num_xfers, + size_t send_xfer_size, size_t send_num_xfers + ); - libusb_transfer *_lut; - usb_endpoint::sptr _endpoint; - const boost::asio::const_buffer _buff; -}; + managed_recv_buffer::sptr get_recv_buff(double); + managed_send_buffer::sptr get_send_buff(double); -/* - * Libusb managed send buffer - * Construct a send buffer from a libusb transfer. The memory held by - * the libusb transfer is exposed through the managed buffer interface. - * Committing the buffer will set the data length and submit the buffer - * to the endpoint. Submitting a buffer multiple times or destroying - * the buffer before committing is an error. For the latter, the transfer - * is returned to the endpoint with no data for reuse. - */ -class libusb_managed_send_buffer_impl : public managed_send_buffer { -public: - libusb_managed_send_buffer_impl(libusb_transfer *lut, - usb_endpoint::sptr endpoint) - : _buff(lut->buffer, lut->length), _committed(false) - { - _lut = lut; - _endpoint = endpoint; - } + size_t get_num_recv_frames(void) const { return _recv_num_frames; } + size_t get_num_send_frames(void) const { return _send_num_frames; } - ~libusb_managed_send_buffer_impl(void){ - if (!_committed) { - _lut->length = 0; - _lut->actual_length = 0; - _endpoint->submit(_lut); - } +private: + void release(libusb_transfer *lut){ + _recv_ep->submit(lut); } - ssize_t commit(size_t num_bytes) - { - if (_committed) { - std::cerr << "UHD: send buffer already committed" << std::endl; - return 0; - } - - UHD_ASSERT_THROW(num_bytes <= boost::asio::buffer_size(_buff)); - - _lut->length = num_bytes; - _lut->actual_length = 0; - + void commit(libusb_transfer *lut, size_t num_bytes){ + lut->length = num_bytes; try{ - _endpoint->submit(_lut); - _committed = true; - return num_bytes; + _send_ep->submit(lut); } catch(const std::exception &e){ std::cerr << "Error in commit: " << e.what() << std::endl; - return -1; } } -private: - const boost::asio::mutable_buffer &get(void) const{ - return _buff; - } - - libusb_transfer *_lut; - usb_endpoint::sptr _endpoint; - const boost::asio::mutable_buffer _buff; - bool _committed; -}; - - -/*********************************************************************** - * USB zero_copy device class - **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy -{ -private: libusb::device_handle::sptr _handle; + size_t _recv_xfer_size, _send_xfer_size; size_t _recv_num_frames, _send_num_frames; usb_endpoint::sptr _recv_ep, _send_ep; - -public: - typedef boost::shared_ptr<libusb_zero_copy_impl> sptr; - - libusb_zero_copy_impl( - libusb::device_handle::sptr handle, - unsigned int recv_endpoint, unsigned int send_endpoint, - size_t recv_xfer_size, size_t recv_num_xfers, - size_t send_xfer_size, size_t send_num_xfers - ); - - managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); - managed_send_buffer::sptr get_send_buff(void); - - size_t get_num_recv_frames(void) const { return _recv_num_frames; } - size_t get_num_send_frames(void) const { return _send_num_frames; } }; /* @@ -430,7 +340,9 @@ libusb_zero_copy_impl::libusb_zero_copy_impl( UHD_ASSERT_THROW(send_xfer_size % 512 == 0); //store the num xfers for the num frames count + _recv_xfer_size = recv_xfer_size; _recv_num_frames = recv_num_xfers; + _send_xfer_size = send_xfer_size; _send_num_frames = send_num_xfers; _handle->claim_interface(2 /*in interface*/); @@ -459,15 +371,16 @@ libusb_zero_copy_impl::libusb_zero_copy_impl( * Return empty pointer if no transfer is available (timeout or error). * \return pointer to a managed receive buffer */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms){ - libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout_ms); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ + libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout); if (lut == NULL) { return managed_recv_buffer::sptr(); } else { - return managed_recv_buffer::sptr( - new libusb_managed_recv_buffer_impl(lut, - _recv_ep)); + return managed_recv_buffer::make_safe( + boost::asio::const_buffer(lut->buffer, lut->actual_length), + boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) + ); } } @@ -478,15 +391,16 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms * (timeout or error). * \return pointer to a managed send buffer */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ - libusb_transfer *lut = _send_ep->get_lut_with_wait(/* TODO timeout API */); +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ + libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout); if (lut == NULL) { return managed_send_buffer::sptr(); } else { - return managed_send_buffer::sptr( - new libusb_managed_send_buffer_impl(lut, - _send_ep)); + return managed_send_buffer::make_safe( + boost::asio::mutable_buffer(lut->buffer, _send_xfer_size), + boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) + ); } } @@ -494,18 +408,18 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ * USB zero_copy make functions **********************************************************************/ usb_zero_copy::sptr usb_zero_copy::make( - usb_device_handle::sptr handle, + usb_device_handle::sptr handle, unsigned int recv_endpoint, unsigned int send_endpoint, - size_t recv_xfer_size, size_t recv_num_xfers, - size_t send_xfer_size, size_t send_num_xfers + size_t recv_xfer_size, size_t recv_num_xfers, + size_t send_xfer_size, size_t send_num_xfers ){ libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( boost::static_pointer_cast<libusb::special_handle>(handle)->get_device() )); return sptr(new libusb_zero_copy_impl( - dev_handle, - recv_endpoint, send_endpoint, - recv_xfer_size, recv_num_xfers, - send_xfer_size, send_num_xfers + dev_handle, + recv_endpoint, send_endpoint, + recv_xfer_size, recv_num_xfers, + send_xfer_size, send_num_xfers )); } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 0a6c9f2af..2cf7bde18 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -17,25 +17,37 @@ #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/transport/udp_simple.hpp> //mtu +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/utils/thread_priority.hpp> #include <uhd/utils/assert.hpp> #include <uhd/utils/warning.hpp> -#include <boost/cstdint.hpp> +#include <boost/shared_array.hpp> #include <boost/asio.hpp> #include <boost/format.hpp> +#include <boost/thread.hpp> #include <iostream> +using namespace uhd; using namespace uhd::transport; +namespace asio = boost::asio; /*********************************************************************** * Constants **********************************************************************/ //enough buffering for half a second of samples at full rate on usrp2 -static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(sizeof(boost::uint32_t) * 25e6 * 0.5); +static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); + //Large buffers cause more underflow at high rates. //Perhaps this is due to the kernel scheduling, //but may change with host-based flow control. static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); +//the number of async frames to allocate for each send and recv +static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32; + +//a single concurrent thread for io_service seems to be the fastest +static const size_t CONCURRENCY_HINT = 1; + /*********************************************************************** * Zero Copy UDP implementation with ASIO: * This is the portable zero copy implementation for systems @@ -43,36 +55,57 @@ static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class udp_zero_copy_impl: - public phony_zero_copy_recv_if, - public phony_zero_copy_send_if, - public udp_zero_copy -{ +class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> { public: - typedef boost::shared_ptr<udp_zero_copy_impl> sptr; + typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; - udp_zero_copy_impl( - const std::string &addr, - const std::string &port + udp_zero_copy_asio_impl( + const std::string &addr, const std::string &port, + size_t recv_frame_size, size_t num_recv_frames, + size_t send_frame_size, size_t num_send_frames ): - phony_zero_copy_recv_if(udp_simple::mtu), - phony_zero_copy_send_if(udp_simple::mtu) + _io_service(CONCURRENCY_HINT), + _work(new asio::io_service::work(_io_service)), + _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames), + _send_frame_size(send_frame_size), _num_send_frames(num_send_frames) { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; - // resolve the address - boost::asio::ip::udp::resolver resolver(_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); - boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + //resolve the address + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); + asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - // create, open, and connect the socket - _socket = new boost::asio::ip::udp::socket(_io_service); - _socket->open(boost::asio::ip::udp::v4()); + //create, open, and connect the socket + _socket = new asio::ip::udp::socket(_io_service); + _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); - _sock_fd = _socket->native(); } - ~udp_zero_copy_impl(void){ + void init(void){ + //allocate all recv frames and release them to begin xfers + _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); + _recv_buffer = boost::shared_array<char>(new char[_num_recv_frames*_recv_frame_size]); + for (size_t i = 0; i < _num_recv_frames; i++){ + release(_recv_buffer.get() + i*_recv_frame_size); + } + + //allocate all send frames and push them into the fifo + _pending_send_buffs = pending_buffs_type::make(_num_send_frames); + _send_buffer = boost::shared_array<char>(new char[_num_send_frames*_send_frame_size]); + for (size_t i = 0; i < _num_send_frames; i++){ + handle_send(_send_buffer.get() + i*_send_frame_size); + } + + //spawn the service threads that will run the io service + for (size_t i = 0; i < CONCURRENCY_HINT; i++) _thread_group.create_thread( + boost::bind(&udp_zero_copy_asio_impl::service, this) + ); + } + + ~udp_zero_copy_asio_impl(void){ + delete _work; //allow io_service run to complete + _thread_group.join_all(); //wait for service threads to exit delete _socket; } @@ -90,60 +123,106 @@ public: return get_buff_size<Opt>(); } + //! pop a filled recv buffer off of the fifo and bind with the release callback + managed_recv_buffer::sptr get_recv_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ + return managed_recv_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::release, + shared_from_this(), + asio::buffer_cast<void*>(buff) + ) + ); + } + return managed_recv_buffer::sptr(); + } - //The number of frames is approximately the buffer size divided by the max datagram size. - //In reality, this is a phony zero-copy interface and the number of frames is infinite. - //However, its sensible to advertise a frame count that is approximate to buffer size. - //This way, the transport caller will have an idea about how much buffering to create. + size_t get_num_recv_frames(void) const {return _num_recv_frames;} - size_t get_num_recv_frames(void) const{ - return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/udp_simple::mtu; + //! pop an empty send buffer off of the fifo and bind with the commit callback + managed_send_buffer::sptr get_send_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ + return managed_send_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::commit, + shared_from_this(), + asio::buffer_cast<void*>(buff), _1 + ) + ); + } + return managed_send_buffer::sptr(); } - size_t get_num_send_frames(void) const{ - return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/udp_simple::mtu; - } + size_t get_num_send_frames(void) const {return _num_send_frames;} private: - boost::asio::ip::udp::socket *_socket; - boost::asio::io_service _io_service; - int _sock_fd; - - ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){ - //setup timeval for timeout - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = timeout_ms*1000; - - //setup rset for timeout - fd_set rset; - FD_ZERO(&rset); - FD_SET(_sock_fd, &rset); - - //call select to perform timed wait - if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) return 0; - - return ::recv( - _sock_fd, - boost::asio::buffer_cast<char *>(buff), - boost::asio::buffer_size(buff), 0 + void service(void){ + set_thread_priority_safe(); + _io_service.run(); + } + + /******************************************************************* + * The async send and receive callbacks + ******************************************************************/ + + //! handle a recv callback -> push the filled memory into the fifo + void handle_recv(void *mem, size_t len){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); + } + + //! release a recv buffer -> start an async recv on the buffer + void release(void *mem){ + _socket->async_receive( + boost::asio::buffer(mem, _recv_frame_size), + boost::bind( + &udp_zero_copy_asio_impl::handle_recv, + shared_from_this(), mem, + asio::placeholders::bytes_transferred + ) ); } - ssize_t send(const boost::asio::const_buffer &buff){ - return ::send( - _sock_fd, - boost::asio::buffer_cast<const char *>(buff), - boost::asio::buffer_size(buff), 0 + //! handle a send callback -> push the emptied memory into the fifo + void handle_send(void *mem){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size)); + } + + //! commit a send buffer -> start an async send on the buffer + void commit(void *mem, size_t len){ + _socket->async_send( + boost::asio::buffer(mem, len), + boost::bind( + &udp_zero_copy_asio_impl::handle_send, + shared_from_this(), mem + ) ); } + + //asio guts -> socket and service + asio::ip::udp::socket *_socket; + asio::io_service _io_service; + asio::io_service::work *_work; + + //memory management -> buffers and fifos + boost::thread_group _thread_group; + boost::shared_array<char> _send_buffer, _recv_buffer; + typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type; + pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; + const size_t _recv_frame_size, _num_recv_frames; + const size_t _send_frame_size, _num_send_frames; }; /*********************************************************************** * UDP zero copy make function **********************************************************************/ template<typename Opt> static void resize_buff_helper( - udp_zero_copy_impl::sptr udp_trans, + udp_zero_copy_asio_impl::sptr udp_trans, size_t target_size, const std::string &name ){ @@ -183,11 +262,17 @@ udp_zero_copy::sptr udp_zero_copy::make( size_t recv_buff_size, size_t send_buff_size ){ - udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port)); + udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl( + addr, port, + udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES, //recv + udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES //send + )); //call the helper to resize send and recv buffers - resize_buff_helper<boost::asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); - resize_buff_helper<boost::asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send"); + resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); + resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send"); + + udp_trans->init(); //buffers resized -> call init() to use return udp_trans; } diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index b603f1371..939517411 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -303,18 +303,18 @@ template <typename T> UHD_INLINE T get_context_code( * Pack a vrt header, copy-convert the data, and send it. * - helper function for vrt_packet_handler::send ******************************************************************/ - static UHD_INLINE void _send1( + static UHD_INLINE size_t _send1( send_state &state, const std::vector<const void *> &buffs, - size_t offset_bytes, - size_t num_samps, + const size_t offset_bytes, + const size_t num_samps, uhd::transport::vrt::if_packet_info_t &if_packet_info, const uhd::io_type_t &io_type, const uhd::otw_type_t &otw_type, const vrt_packer_t &vrt_packer, const get_send_buffs_t &get_send_buffs, - size_t vrt_header_offset_words32, - size_t chans_per_otw_buff + const size_t vrt_header_offset_words32, + const size_t chans_per_otw_buff ){ //load the rest of the if_packet_info in here if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*otw_type.get_sample_size())/sizeof(boost::uint32_t); @@ -322,7 +322,7 @@ template <typename T> UHD_INLINE T get_context_code( //get send buffers for each channel managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); - UHD_ASSERT_THROW(get_send_buffs(send_buffs)); + if (not get_send_buffs(send_buffs)) return 0; std::vector<const void *> io_buffs(chans_per_otw_buff); for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ @@ -343,10 +343,9 @@ template <typename T> UHD_INLINE T get_context_code( //commit the samples to the zero-copy interface size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); - if (send_buffs[i]->commit(num_bytes_total) < ssize_t(num_bytes_total)){ - std::cerr << "commit to send buffer returned less than commit size" << std::endl; - } + send_buffs[i]->commit(num_bytes_total); } + return num_samps; } /******************************************************************* @@ -381,7 +380,6 @@ template <typename T> UHD_INLINE T get_context_code( //////////////////////////////////////////////////////////////// case uhd::device::SEND_MODE_ONE_PACKET:{ //////////////////////////////////////////////////////////////// - size_t num_samps = std::min(total_num_samps, max_samples_per_packet); //fill in parts of the packet info overwrote in full buff mode if_packet_info.has_tsi = metadata.has_time_spec; @@ -389,10 +387,10 @@ template <typename T> UHD_INLINE T get_context_code( if_packet_info.sob = metadata.start_of_burst; if_packet_info.eob = metadata.end_of_burst; - _send1( + return _send1( state, buffs, 0, - num_samps, + std::min(total_num_samps, max_samples_per_packet), if_packet_info, io_type, otw_type, vrt_packer, @@ -400,31 +398,32 @@ template <typename T> UHD_INLINE T get_context_code( vrt_header_offset_words32, chans_per_otw_buff ); - return num_samps; } //////////////////////////////////////////////////////////////// case uhd::device::SEND_MODE_FULL_BUFF:{ //////////////////////////////////////////////////////////////// - //calculate constants for fragmentation - const size_t num_fragments = (total_num_samps+max_samples_per_packet-1)/max_samples_per_packet; - static const size_t first_fragment_index = 0; - const size_t final_fragment_index = num_fragments-1; + size_t total_num_samps_sent = 0; //loop through the following fragment indexes - for (size_t n = first_fragment_index; n <= final_fragment_index; n++){ + while(total_num_samps_sent < total_num_samps){ + + //calculate per-loop-iteration variables + const size_t total_num_samps_unsent = total_num_samps - total_num_samps_sent; + const bool first_fragment = (total_num_samps_sent == 0); + const bool final_fragment = (total_num_samps_unsent <= max_samples_per_packet); //calculate new flags for the fragments - if_packet_info.has_tsi = metadata.has_time_spec and (n == first_fragment_index); - if_packet_info.has_tsf = metadata.has_time_spec and (n == first_fragment_index); - if_packet_info.sob = metadata.start_of_burst and (n == first_fragment_index); - if_packet_info.eob = metadata.end_of_burst and (n == final_fragment_index); + if_packet_info.has_tsi = metadata.has_time_spec and first_fragment; + if_packet_info.has_tsf = if_packet_info.has_tsi; + if_packet_info.sob = metadata.start_of_burst and first_fragment; + if_packet_info.eob = metadata.end_of_burst and final_fragment; //send the fragment with the helper function - _send1( + const size_t num_samps_sent = _send1( state, - buffs, n*max_samples_per_packet*io_type.size, - (n == final_fragment_index)?(total_num_samps%max_samples_per_packet):max_samples_per_packet, + buffs, total_num_samps_sent*io_type.size, + std::min(total_num_samps_unsent, max_samples_per_packet), if_packet_info, io_type, otw_type, vrt_packer, @@ -432,8 +431,10 @@ template <typename T> UHD_INLINE T get_context_code( vrt_header_offset_words32, chans_per_otw_buff ); + total_num_samps_sent += num_samps_sent; + if (num_samps_sent == 0) return total_num_samps_sent; } - return total_num_samps; + return total_num_samps_sent; } default: throw std::runtime_error("unknown send mode"); diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp index 1fcf846a0..a5a864a04 100644 --- a/host/lib/transport/zero_copy.cpp +++ b/host/lib/transport/zero_copy.cpp @@ -16,32 +16,35 @@ // #include <uhd/transport/zero_copy.hpp> -#include <boost/cstdint.hpp> -#include <boost/function.hpp> -#include <boost/bind.hpp> using namespace uhd::transport; /*********************************************************************** - * The pure-virtual deconstructor needs an implementation to be happy + * Safe managed receive buffer **********************************************************************/ -managed_recv_buffer::~managed_recv_buffer(void){ +static void release_nop(void){ /* NOP */ } -/*********************************************************************** - * Phony zero-copy recv interface implementation - **********************************************************************/ - -//! phony zero-copy recv buffer implementation -class managed_recv_buffer_impl : public managed_recv_buffer{ +class safe_managed_receive_buffer : public managed_recv_buffer{ public: - managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){ + safe_managed_receive_buffer( + const boost::asio::const_buffer &buff, + const release_fcn_t &release_fcn + ): + _buff(buff), _release_fcn(release_fcn) + { /* NOP */ } - ~managed_recv_buffer_impl(void){ - delete [] this->cast<const boost::uint8_t *>(); + ~safe_managed_receive_buffer(void){ + _release_fcn(); + } + + void release(void){ + release_fcn_t release_fcn = _release_fcn; + _release_fcn = &release_nop; + return release_fcn(); } private: @@ -50,64 +53,42 @@ private: } const boost::asio::const_buffer _buff; + release_fcn_t _release_fcn; }; -//! phony zero-copy recv interface implementation -struct phony_zero_copy_recv_if::impl{ - impl(size_t max_buff_size) : max_buff_size(max_buff_size){ - /* NOP */ - } - size_t max_buff_size; -}; - -phony_zero_copy_recv_if::phony_zero_copy_recv_if(size_t max_buff_size){ - _impl = UHD_PIMPL_MAKE(impl, (max_buff_size)); -} - -phony_zero_copy_recv_if::~phony_zero_copy_recv_if(void){ - /* NOP */ -} - -managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(size_t timeout_ms){ - //allocate memory - boost::uint8_t *recv_mem = new boost::uint8_t[_impl->max_buff_size]; - - //call recv() with timeout option - ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size), timeout_ms); - - if (num_bytes <= 0) return managed_recv_buffer::sptr(); //NULL sptr - - //create a new managed buffer to house the data - return managed_recv_buffer::sptr( - new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes)) - ); +managed_recv_buffer::sptr managed_recv_buffer::make_safe( + const boost::asio::const_buffer &buff, + const release_fcn_t &release_fcn +){ + return sptr(new safe_managed_receive_buffer(buff, release_fcn)); } /*********************************************************************** - * Phony zero-copy send interface implementation + * Safe managed send buffer **********************************************************************/ +static void commit_nop(size_t){ + /* NOP */ +} -//! phony zero-copy send buffer implementation -class managed_send_buffer_impl : public managed_send_buffer{ +class safe_managed_send_buffer : public managed_send_buffer{ public: - typedef boost::function<ssize_t(const boost::asio::const_buffer &)> send_fcn_t; - - managed_send_buffer_impl( + safe_managed_send_buffer( const boost::asio::mutable_buffer &buff, - const send_fcn_t &send_fcn + const commit_fcn_t &commit_fcn ): - _buff(buff), - _send_fcn(send_fcn) + _buff(buff), _commit_fcn(commit_fcn) { /* NOP */ } - ~managed_send_buffer_impl(void){ - /* NOP */ + ~safe_managed_send_buffer(void){ + _commit_fcn(0); } - ssize_t commit(size_t num_bytes){ - return _send_fcn(boost::asio::buffer(_buff, num_bytes)); + void commit(size_t num_bytes){ + commit_fcn_t commit_fcn = _commit_fcn; + _commit_fcn = &commit_nop; + return commit_fcn(num_bytes); } private: @@ -116,28 +97,12 @@ private: } const boost::asio::mutable_buffer _buff; - const send_fcn_t _send_fcn; -}; - -//! phony zero-copy send interface implementation -struct phony_zero_copy_send_if::impl{ - boost::uint8_t *send_mem; - managed_send_buffer::sptr send_buff; + commit_fcn_t _commit_fcn; }; -phony_zero_copy_send_if::phony_zero_copy_send_if(size_t max_buff_size){ - _impl = UHD_PIMPL_MAKE(impl, ()); - _impl->send_mem = new boost::uint8_t[max_buff_size]; - _impl->send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( - boost::asio::buffer(_impl->send_mem, max_buff_size), - boost::bind(&phony_zero_copy_send_if::send, this, _1) - )); -} - -phony_zero_copy_send_if::~phony_zero_copy_send_if(void){ - delete [] _impl->send_mem; -} - -managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(void){ - return _impl->send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these +safe_managed_send_buffer::sptr managed_send_buffer::make_safe( + const boost::asio::mutable_buffer &buff, + const commit_fcn_t &commit_fcn +){ + return sptr(new safe_managed_send_buffer(buff, commit_fcn)); } diff --git a/host/lib/usrp/dboard/db_tvrx.cpp b/host/lib/usrp/dboard/db_tvrx.cpp index 1cb74c5ae..41c52fbf5 100644 --- a/host/lib/usrp/dboard/db_tvrx.cpp +++ b/host/lib/usrp/dboard/db_tvrx.cpp @@ -58,7 +58,7 @@ static const bool tvrx_debug = false; static const freq_range_t tvrx_freq_range(50e6, 860e6); -static const std::string tvrx_antennas = std::string(""); //only got one +static const prop_names_t tvrx_antennas = list_of(""); //only got one static const uhd::dict<std::string, freq_range_t> tvrx_freq_ranges = map_list_of ("VHFLO", freq_range_t(50e6, 158e6)) @@ -436,7 +436,7 @@ void tvrx::rx_get(const wax::obj &key_, wax::obj &val){ return; case SUBDEV_PROP_ANTENNA: - val = std::string(tvrx_antennas); + val = std::string(""); return; case SUBDEV_PROP_ANTENNA_NAMES: diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index aee760a83..676b1536a 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -34,35 +34,6 @@ using namespace uhd::transport; namespace asio = boost::asio; /*********************************************************************** - * Pseudo send buffer implementation - **********************************************************************/ -class pseudo_managed_send_buffer : public managed_send_buffer{ -public: - - pseudo_managed_send_buffer( - const boost::asio::mutable_buffer &buff, - const boost::function<ssize_t(size_t)> &commit - ): - _buff(buff), - _commit(commit) - { - /* NOP */ - } - - ssize_t commit(size_t num_bytes){ - return _commit(num_bytes); - } - -private: - const boost::asio::mutable_buffer &get(void) const{ - return _buff; - } - - const boost::asio::mutable_buffer _buff; - const boost::function<ssize_t(size_t)> _commit; -}; - -/*********************************************************************** * IO Implementation Details **********************************************************************/ struct usrp1_impl::io_impl{ @@ -94,12 +65,13 @@ struct usrp1_impl::io_impl{ //all of this to ensure only full buffers are committed managed_send_buffer::sptr send_buff; size_t num_bytes_committed; + double send_timeout; boost::uint8_t pseudo_buff[BYTES_PER_PACKET]; - ssize_t phony_commit_pseudo_buff(size_t num_bytes); - ssize_t phony_commit_send_buff(size_t num_bytes); - ssize_t commit_send_buff(void); + void phony_commit_pseudo_buff(size_t num_bytes); + void phony_commit_send_buff(size_t num_bytes); + void commit_send_buff(void); void flush_send_buff(void); - bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &); + bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &, double); //helpers to get at the send buffer + offset inline void *get_send_mem_ptr(void){ @@ -118,28 +90,25 @@ struct usrp1_impl::io_impl{ * The first loop iteration will fill the remainder of the send buffer. * The second loop iteration will empty the pseudo buffer remainder. */ -ssize_t usrp1_impl::io_impl::phony_commit_pseudo_buff(size_t num_bytes){ +void usrp1_impl::io_impl::phony_commit_pseudo_buff(size_t num_bytes){ size_t bytes_to_copy = num_bytes, bytes_copied = 0; while(bytes_to_copy){ size_t bytes_copied_here = std::min(bytes_to_copy, get_send_mem_size()); std::memcpy(get_send_mem_ptr(), pseudo_buff + bytes_copied, bytes_copied_here); - ssize_t ret = phony_commit_send_buff(bytes_copied_here); - if (ret < 0) return ret; - bytes_to_copy -= ret; - bytes_copied += ret; + phony_commit_send_buff(bytes_copied_here); + bytes_to_copy -= bytes_copied_here; + bytes_copied += bytes_copied_here; } - return bytes_copied; } /*! * Accept a commit of num bytes to the send buffer. * Conditionally commit the send buffer if full. */ -ssize_t usrp1_impl::io_impl::phony_commit_send_buff(size_t num_bytes){ +void usrp1_impl::io_impl::phony_commit_send_buff(size_t num_bytes){ num_bytes_committed += num_bytes; - if (num_bytes_committed != send_buff->size()) return num_bytes; - ssize_t ret = commit_send_buff(); - return (ret < 0)? ret : num_bytes; + if (num_bytes_committed != send_buff->size()) return; + commit_send_buff(); } /*! @@ -157,31 +126,31 @@ void usrp1_impl::io_impl::flush_send_buff(void){ * Perform an actual commit on the send buffer: * Commit the contents of the send buffer and request a new buffer. */ -ssize_t usrp1_impl::io_impl::commit_send_buff(void){ - ssize_t ret = send_buff->commit(num_bytes_committed); - send_buff = data_transport->get_send_buff(); +void usrp1_impl::io_impl::commit_send_buff(void){ + send_buff->commit(num_bytes_committed); + send_buff = data_transport->get_send_buff(send_timeout); num_bytes_committed = 0; - return ret; } bool usrp1_impl::io_impl::get_send_buffs( - vrt_packet_handler::managed_send_buffs_t &buffs + vrt_packet_handler::managed_send_buffs_t &buffs, double timeout ){ + send_timeout = timeout; UHD_ASSERT_THROW(buffs.size() == 1); //not enough bytes free -> use the pseudo buffer if (get_send_mem_size() < BYTES_PER_PACKET){ - buffs[0] = managed_send_buffer::sptr(new pseudo_managed_send_buffer( + buffs[0] = managed_send_buffer::make_safe( boost::asio::buffer(pseudo_buff), boost::bind(&usrp1_impl::io_impl::phony_commit_pseudo_buff, this, _1) - )); + ); } //otherwise use the send buffer offset by the bytes written else{ - buffs[0] = managed_send_buffer::sptr(new pseudo_managed_send_buffer( + buffs[0] = managed_send_buffer::make_safe( boost::asio::buffer(get_send_mem_ptr(), get_send_mem_size()), boost::bind(&usrp1_impl::io_impl::phony_commit_send_buff, this, _1) - )); + ); } return buffs[0].get() != NULL; @@ -216,7 +185,7 @@ static void usrp1_bs_vrt_packer( size_t usrp1_impl::send( const std::vector<const void *> &buffs, size_t num_samps, const tx_metadata_t &metadata, const io_type_t &io_type, - send_mode_t send_mode + send_mode_t send_mode, double timeout ){ size_t num_samps_sent = vrt_packet_handler::send( _io_impl->packet_handler_send_state, //last state of the send handler @@ -225,7 +194,7 @@ size_t usrp1_impl::send( io_type, _tx_otw_type, //input and output types to convert _clock_ctrl->get_master_clock_freq(), //master clock tick rate &usrp1_bs_vrt_packer, - boost::bind(&usrp1_impl::io_impl::get_send_buffs, _io_impl.get(), _1), + boost::bind(&usrp1_impl::io_impl::get_send_buffs, _io_impl.get(), _1, timeout), get_max_send_samps_per_packet(), 0, //vrt header offset _tx_subdev_spec.size() //num channels @@ -272,18 +241,18 @@ static void usrp1_bs_vrt_unpacker( } static bool get_recv_buffs( - zero_copy_if::sptr zc_if, size_t timeout_ms, + zero_copy_if::sptr zc_if, double timeout, vrt_packet_handler::managed_recv_buffs_t &buffs ){ UHD_ASSERT_THROW(buffs.size() == 1); - buffs[0] = zc_if->get_recv_buff(timeout_ms); + buffs[0] = zc_if->get_recv_buff(timeout); return buffs[0].get() != NULL; } size_t usrp1_impl::recv( const std::vector<void *> &buffs, size_t num_samps, rx_metadata_t &metadata, const io_type_t &io_type, - recv_mode_t recv_mode, size_t timeout_ms + recv_mode_t recv_mode, double timeout ){ size_t num_samps_recvd = vrt_packet_handler::recv( _io_impl->packet_handler_recv_state, //last state of the recv handler @@ -292,7 +261,7 @@ size_t usrp1_impl::recv( io_type, _rx_otw_type, //input and output types to convert _clock_ctrl->get_master_clock_freq(), //master clock tick rate &usrp1_bs_vrt_unpacker, - boost::bind(&get_recv_buffs, _data_transport, timeout_ms, _1), + boost::bind(&get_recv_buffs, _data_transport, timeout, _1), &vrt_packet_handler::handle_overflow_nop, 0, //vrt header offset _rx_subdev_spec.size() //num channels diff --git a/host/lib/usrp/usrp1/usrp1_impl.cpp b/host/lib/usrp/usrp1/usrp1_impl.cpp index 793e3027d..6ebd6bb09 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.cpp +++ b/host/lib/usrp/usrp1/usrp1_impl.cpp @@ -83,9 +83,7 @@ static device_addrs_t usrp1_find(const device_addr_t &hint) //find the usrps and load firmware BOOST_FOREACH(usb_device_handle::sptr handle, usb_device_handle::get_device_list(vid, pid)) { - usb_control::sptr ctrl_transport = usb_control::make(handle); - usrp_ctrl::sptr usrp_ctrl = usrp_ctrl::make(ctrl_transport); - usrp_ctrl->usrp_load_firmware(usrp1_fw_image); + usrp_ctrl::make(usb_control::make(handle))->usrp_load_firmware(usrp1_fw_image); } //get descriptors again with serial number, but using the initialized VID/PID now since we have firmware @@ -93,13 +91,13 @@ static device_addrs_t usrp1_find(const device_addr_t &hint) pid = USRP1_PRODUCT_ID; BOOST_FOREACH(usb_device_handle::sptr handle, usb_device_handle::get_device_list(vid, pid)) { - device_addr_t new_addr; - new_addr["type"] = "usrp1"; - new_addr["serial"] = handle->get_serial(); - //this is a found usrp1 when a hint serial is not specified or it matches - if (not hint.has_key("serial") or hint["serial"] == new_addr["serial"]){ - usrp1_addrs.push_back(new_addr); - } + device_addr_t new_addr; + new_addr["type"] = "usrp1"; + new_addr["serial"] = handle->get_serial(); + //this is a found usrp1 when a hint serial is not specified or it matches + if (not hint.has_key("serial") or hint["serial"] == new_addr["serial"]){ + usrp1_addrs.push_back(new_addr); + } } return usrp1_addrs; @@ -109,12 +107,12 @@ static device_addrs_t usrp1_find(const device_addr_t &hint) * Make **********************************************************************/ template<typename output_type> static output_type cast_from_dev_addr( - const device_addr_t &device_addr, - const std::string &key, - output_type def_val + const device_addr_t &device_addr, + const std::string &key, + output_type def_val ){ - return (device_addr.has_key(key))? - boost::lexical_cast<output_type>(device_addr[key]) : def_val; + return (device_addr.has_key(key))? + boost::lexical_cast<output_type>(device_addr[key]) : def_val; } static device::sptr usrp1_make(const device_addr_t &device_addr) @@ -211,9 +209,9 @@ usrp1_impl::~usrp1_impl(void){ /* NOP */ } -bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, size_t timeout_ms){ +bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, double timeout){ //dummy fill-in for the recv_async_msg - boost::this_thread::sleep(boost::posix_time::milliseconds(timeout_ms)); + boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6))); return false; } diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index 20ae3c02a..f2c464610 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -83,13 +83,12 @@ public: size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, - send_mode_t); + send_mode_t, double); size_t recv(const std::vector<void *> &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, - recv_mode_t, - size_t timeout); + recv_mode_t, double); static const size_t BYTES_PER_PACKET = 512*4; //under the transfer size @@ -101,7 +100,7 @@ public: return BYTES_PER_PACKET/_rx_otw_type.get_sample_size()/_rx_subdev_spec.size(); } - bool recv_async_msg(uhd::async_metadata_t &, size_t); + bool recv_async_msg(uhd::async_metadata_t &, double); private: /*! diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 3395f94e2..33cec3cbc 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -33,7 +33,6 @@ using namespace uhd::transport; namespace asio = boost::asio; static const int underflow_flags = async_metadata_t::EVENT_CODE_UNDERFLOW | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET; -static const double RECV_TIMEOUT_MS = 100; /*********************************************************************** * io impl details (internal to this file) @@ -59,9 +58,9 @@ struct usrp2_impl::io_impl{ recv_pirate_crew.join_all(); } - bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, size_t timeout_ms){ + bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw - return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(timeout_ms)); + return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout); } //state management for the vrt packet handler code @@ -91,7 +90,7 @@ void usrp2_impl::io_impl::recv_pirate_loop( size_t next_packet_seq = 0; while(recv_pirate_crew_raiding){ - managed_recv_buffer::sptr buff = zc_if->get_recv_buff(RECV_TIMEOUT_MS); + managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); if (not buff.get()) continue; //ignore timeout/error buffers try{ @@ -151,7 +150,7 @@ void usrp2_impl::io_init(void){ std::memcpy(send_buff->cast<void*>(), &data, sizeof(data)); send_buff->commit(sizeof(data)); //drain the recv buffers (may have junk) - while (data_transport->get_recv_buff(RECV_TIMEOUT_MS).get()){}; + while (data_transport->get_recv_buff().get()){}; } //the number of recv frames is the number for the first transport @@ -169,22 +168,16 @@ void usrp2_impl::io_init(void){ _mboards.at(i), i )); } - - std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl; - std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl; - std::cout << "Recv pirate num frames: " << num_frames << std::endl; } /*********************************************************************** * Async Data **********************************************************************/ bool usrp2_impl::recv_async_msg( - async_metadata_t &async_metadata, size_t timeout_ms + async_metadata_t &async_metadata, double timeout ){ boost::this_thread::disable_interruption di; //disable because the wait can throw - return _io_impl->async_msg_fifo->pop_with_timed_wait( - async_metadata, boost::posix_time::milliseconds(timeout_ms) - ); + return _io_impl->async_msg_fifo->pop_with_timed_wait(async_metadata, timeout); } /*********************************************************************** @@ -192,19 +185,22 @@ bool usrp2_impl::recv_async_msg( **********************************************************************/ static bool get_send_buffs( const std::vector<udp_zero_copy::sptr> &trans, - vrt_packet_handler::managed_send_buffs_t &buffs + vrt_packet_handler::managed_send_buffs_t &buffs, + double timeout ){ UHD_ASSERT_THROW(trans.size() == buffs.size()); + bool good = true; for (size_t i = 0; i < buffs.size(); i++){ - buffs[i] = trans[i]->get_send_buff(); + buffs[i] = trans[i]->get_send_buff(timeout); + good = good and (buffs[i].get() != NULL); } - return true; + return good; } size_t usrp2_impl::send( const std::vector<const void *> &buffs, size_t num_samps, const tx_metadata_t &metadata, const io_type_t &io_type, - send_mode_t send_mode + send_mode_t send_mode, double timeout ){ return vrt_packet_handler::send( _io_impl->packet_handler_send_state, //last state of the send handler @@ -213,7 +209,7 @@ size_t usrp2_impl::send( io_type, _io_helper.get_tx_otw_type(), //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_pack_be, - boost::bind(&get_send_buffs, _data_transports, _1), + boost::bind(&get_send_buffs, _data_transports, _1, timeout), get_max_send_samps_per_packet() ); } @@ -224,7 +220,7 @@ size_t usrp2_impl::send( size_t usrp2_impl::recv( const std::vector<void *> &buffs, size_t num_samps, rx_metadata_t &metadata, const io_type_t &io_type, - recv_mode_t recv_mode, size_t timeout_ms + recv_mode_t recv_mode, double timeout ){ return vrt_packet_handler::recv( _io_impl->packet_handler_recv_state, //last state of the recv handler @@ -233,6 +229,6 @@ size_t usrp2_impl::recv( io_type, _io_helper.get_rx_otw_type(), //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_unpack_be, - boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout_ms) + boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout) ); } diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 157d17057..e8763b284 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -234,7 +234,7 @@ public: size_t send( const std::vector<const void *> &, size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, - uhd::device::send_mode_t + uhd::device::send_mode_t, double ); size_t get_max_recv_samps_per_packet(void) const{ return _io_helper.get_max_recv_samps_per_packet(); @@ -242,9 +242,9 @@ public: size_t recv( const std::vector<void *> &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, - uhd::device::recv_mode_t, size_t + uhd::device::recv_mode_t, double ); - bool recv_async_msg(uhd::async_metadata_t &, size_t); + bool recv_async_msg(uhd::async_metadata_t &, double); private: //device properties interface diff --git a/host/lib/version.cpp b/host/lib/version.cpp index 5edbca09b..93fdecb1a 100644 --- a/host/lib/version.cpp +++ b/host/lib/version.cpp @@ -21,3 +21,17 @@ std::string uhd::get_version_string(void){ return UHD_VERSION_STRING; } + +#include <uhd/utils/static.hpp> +#include <boost/version.hpp> +#include <iostream> + +UHD_STATIC_BLOCK(print_system_info){ + std::cout + << BOOST_PLATFORM << "; " + << BOOST_COMPILER << "; " + << "Boost_" << BOOST_VERSION << "; " + << "UHD_" << uhd::get_version_string() + << std::endl << std::endl + ; +} |