diff options
Diffstat (limited to 'host/lib')
22 files changed, 586 insertions, 409 deletions
diff --git a/host/lib/convert/convert_common.hpp b/host/lib/convert/convert_common.hpp index 1a653a56f..c6ba1fcf9 100644 --- a/host/lib/convert/convert_common.hpp +++ b/host/lib/convert/convert_common.hpp @@ -41,8 +41,10 @@ /*********************************************************************** * Typedefs **********************************************************************/ +typedef std::complex<double> fc64_t; typedef std::complex<float> fc32_t; typedef std::complex<boost::int16_t> sc16_t; +typedef std::complex<boost::int8_t> sc8_t; typedef boost::uint32_t item32_t; /*********************************************************************** @@ -87,4 +89,27 @@ static UHD_INLINE fc32_t item32_to_fc32(item32_t item){ ); } +/*********************************************************************** + * Convert complex double buffer to items32 (no swap) + **********************************************************************/ +static const double shorts_per_double = double(32767); + +static UHD_INLINE item32_t fc64_to_item32(fc64_t num){ + boost::uint16_t real = boost::int16_t(num.real()*shorts_per_double); + boost::uint16_t imag = boost::int16_t(num.imag()*shorts_per_double); + return (item32_t(real) << 16) | (item32_t(imag) << 0); +} + +/*********************************************************************** + * Convert items32 buffer to complex double + **********************************************************************/ +static const double doubles_per_short = double(1.0/shorts_per_double); + +static UHD_INLINE fc64_t item32_to_fc64(item32_t item){ + return fc64_t( + float(boost::int16_t(item >> 16)*doubles_per_short), + float(boost::int16_t(item >> 0)*doubles_per_short) + ); +} + #endif /* INCLUDED_LIBUHD_CONVERT_COMMON_HPP */ diff --git a/host/lib/convert/gen_convert_general.py b/host/lib/convert/gen_convert_general.py index 47c4cd7d0..f03448047 100644 --- a/host/lib/convert/gen_convert_general.py +++ b/host/lib/convert/gen_convert_general.py @@ -85,7 +85,7 @@ if __name__ == '__main__': output = parse_tmpl(TMPL_HEADER, file=file) for width in 1, 2, 3, 4: for swap, swap_fcn in (('nswap', ''), ('bswap', 'uhd::byteswap')): - for cpu_type in 'fc32', 'sc16': + for cpu_type in 'fc64', 'fc32', 'sc16': output += parse_tmpl( TMPL_CONV_TO_FROM_ITEM32_1 if width == 1 else TMPL_CONV_TO_FROM_ITEM32_X, width=width, swap=swap, swap_fcn=swap_fcn, cpu_type=cpu_type diff --git a/host/lib/convert/gen_convert_pred.py b/host/lib/convert/gen_convert_pred.py index 1d573bf1a..d2f90bf41 100644 --- a/host/lib/convert/gen_convert_pred.py +++ b/host/lib/convert/gen_convert_pred.py @@ -21,8 +21,6 @@ TMPL_TEXT = """ /*********************************************************************** * This file was generated by $file on $time.strftime("%c") **********************************************************************/ -typedef size_t pred_type; - \#include <boost/tokenizer.hpp> \#include <boost/lexical_cast.hpp> \#include <boost/detail/endian.hpp> @@ -31,6 +29,9 @@ typedef size_t pred_type; \#include <string> \#include <vector> +typedef size_t pred_type; +typedef std::vector<pred_type> pred_vector_type; + enum dir_type{ DIR_OTW_TO_CPU = 0, DIR_CPU_TO_OTW = 1 @@ -69,8 +70,10 @@ pred_type make_pred(const std::string &markup, dir_type &dir){ dir = DIR_OTW_TO_CPU; } - if (cpu_type == "fc32") pred |= $ph.fc32_p; + if (cpu_type == "fc64") pred |= $ph.fc64_p; + else if (cpu_type == "fc32") pred |= $ph.fc32_p; else if (cpu_type == "sc16") pred |= $ph.sc16_p; + else if (cpu_type == "sc8") pred |= $ph.sc8_p; else throw pred_error("unhandled io type " + cpu_type); if (otw_type == "item32") pred |= $ph.item32_p; @@ -99,44 +102,60 @@ pred_type make_pred(const std::string &markup, dir_type &dir){ return pred; } +#define pred_table_wildcard pred_type(~0) +#define pred_table_max_size size_t(128) +#define pred_table_index(e) (pred_type(e) & 0x7f) + +static pred_vector_type get_pred_byte_order_table(void){ + pred_vector_type table(pred_table_max_size, pred_table_wildcard); + \#ifdef BOOST_BIG_ENDIAN + table[pred_table_index(otw_type_t::BO_BIG_ENDIAN)] = $ph.nswap_p; + table[pred_table_index(otw_type_t::BO_LITTLE_ENDIAN)] = $ph.bswap_p; + \#else + table[pred_table_index(otw_type_t::BO_BIG_ENDIAN)] = $ph.bswap_p; + table[pred_table_index(otw_type_t::BO_LITTLE_ENDIAN)] = $ph.nswap_p; + \#endif + table[pred_table_index(otw_type_t::BO_NATIVE)] = $ph.nswap_p; + return table; +} + +static pred_vector_type get_pred_io_type_table(void){ + pred_vector_type table(pred_table_max_size, pred_table_wildcard); + table[pred_table_index(io_type_t::COMPLEX_FLOAT64)] = $ph.fc64_p; + table[pred_table_index(io_type_t::COMPLEX_FLOAT32)] = $ph.fc32_p; + table[pred_table_index(io_type_t::COMPLEX_INT16)] = $ph.sc16_p; + return table; +} + +static pred_vector_type get_pred_num_io_table(void){ + pred_vector_type table(pred_table_max_size, pred_table_wildcard); + table[1] = $ph.chan1_p; + table[2] = $ph.chan2_p; + table[3] = $ph.chan3_p; + table[4] = $ph.chan4_p; + return table; +} + UHD_INLINE pred_type make_pred( const io_type_t &io_type, const otw_type_t &otw_type, size_t num_inputs, size_t num_outputs ){ - pred_type pred = 0; + pred_type pred = $ph.item32_p; //only item32 supported as of now - switch(otw_type.byteorder){ - \#ifdef BOOST_BIG_ENDIAN - case otw_type_t::BO_BIG_ENDIAN: pred |= $ph.nswap_p; break; - case otw_type_t::BO_LITTLE_ENDIAN: pred |= $ph.bswap_p; break; - \#else - case otw_type_t::BO_BIG_ENDIAN: pred |= $ph.bswap_p; break; - case otw_type_t::BO_LITTLE_ENDIAN: pred |= $ph.nswap_p; break; - \#endif - case otw_type_t::BO_NATIVE: pred |= $ph.nswap_p; break; - default: throw pred_error("unhandled otw byteorder type"); - } + static const pred_vector_type pred_byte_order_table(get_pred_byte_order_table()); + pred |= pred_byte_order_table[pred_table_index(otw_type.byteorder)]; - switch(otw_type.get_sample_size()){ - case sizeof(boost::uint32_t): pred |= $ph.item32_p; break; - default: throw pred_error("unhandled otw sample size"); - } + static const pred_vector_type pred_io_type_table(get_pred_io_type_table()); + pred |= pred_io_type_table[pred_table_index(io_type.tid)]; - switch(io_type.tid){ - case io_type_t::COMPLEX_FLOAT32: pred |= $ph.fc32_p; break; - case io_type_t::COMPLEX_INT16: pred |= $ph.sc16_p; break; - default: throw pred_error("unhandled io type id"); - } + static const pred_vector_type pred_num_io_table(get_pred_num_io_table()); + pred |= pred_num_io_table[pred_table_index(num_inputs*num_outputs)]; - switch(num_inputs*num_outputs){ //FIXME treated as one value - case 1: pred |= $ph.chan1_p; break; - case 2: pred |= $ph.chan2_p; break; - case 3: pred |= $ph.chan3_p; break; - case 4: pred |= $ph.chan4_p; break; - default: throw pred_error("unhandled number of channels"); - } + if (pred == pred_table_wildcard) throw pred_error( + "unhanded input combination for make_pred()" + ); return pred; } @@ -150,12 +169,14 @@ class ph: bswap_p = 0b00001 nswap_p = 0b00000 item32_p = 0b00000 + sc8_p = 0b00000 sc16_p = 0b00010 - fc32_p = 0b00000 + fc32_p = 0b00100 + fc64_p = 0b00110 chan1_p = 0b00000 - chan2_p = 0b00100 - chan3_p = 0b01000 - chan4_p = 0b01100 + chan2_p = 0b01000 + chan3_p = 0b10000 + chan4_p = 0b11000 if __name__ == '__main__': import sys, os diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index dbe026ba3..3ba562d68 100755 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2010 Ettus Research LLC +# Copyright 2010-2011 Ettus Research LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -35,6 +35,7 @@ TMPL_TEXT = """ \#include <uhd/utils/byteswap.hpp> \#include <boost/detail/endian.hpp> \#include <stdexcept> +\#include <vector> //define the endian macros to convert integers \#ifdef BOOST_BIG_ENDIAN @@ -48,18 +49,26 @@ TMPL_TEXT = """ using namespace uhd; using namespace uhd::transport; -######################################################################## -#def gen_code($XE_MACRO, $suffix) -######################################################################## +typedef size_t pred_type; +typedef std::vector<pred_type> pred_table_type; +#define pred_table_index(hdr) ((hdr >> 20) & 0x1ff) + +static pred_table_type get_pred_unpack_table(void){ + pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28) + for (size_t i = 0; i < table.size(); i++){ + boost::uint32_t vrt_hdr_word = i << 20; + if(vrt_hdr_word & $hex(0x1 << 28)) table[i] |= $hex($sid_p); + if(vrt_hdr_word & $hex(0x1 << 27)) table[i] |= $hex($cid_p); + if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p); + if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p); + if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p); + } + return table; +} ######################################################################## -## setup predicates +#def gen_code($XE_MACRO, $suffix) ######################################################################## -#set $sid_p = 0b00001 -#set $cid_p = 0b00010 -#set $tsi_p = 0b00100 -#set $tsf_p = 0b01000 -#set $tlr_p = 0b10000 void vrt::if_hdr_pack_$(suffix)( boost::uint32_t *packet_buff, @@ -67,7 +76,7 @@ void vrt::if_hdr_pack_$(suffix)( ){ boost::uint32_t vrt_hdr_flags = 0; - boost::uint8_t pred = 0; + pred_type pred = 0; if (if_packet_info.has_sid) pred |= $hex($sid_p); if (if_packet_info.has_cid) pred |= $hex($cid_p); if (if_packet_info.has_tsi) pred |= $hex($tsi_p); @@ -159,12 +168,8 @@ void vrt::if_hdr_unpack_$(suffix)( //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented - boost::uint8_t pred = 0; - if(vrt_hdr_word & $hex(0x1 << 28)) pred |= $hex($sid_p); - if(vrt_hdr_word & $hex(0x1 << 27)) pred |= $hex($cid_p); - if(vrt_hdr_word & $hex(0x3 << 22)) pred |= $hex($tsi_p); - if(vrt_hdr_word & $hex(0x3 << 20)) pred |= $hex($tsf_p); - if(vrt_hdr_word & $hex(0x1 << 26)) pred |= $hex($tlr_p); + static const pred_table_type pred_unpack_table(get_pred_unpack_table()); + const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)]; switch(pred){ #for $pred in range(2**5) @@ -200,7 +205,7 @@ void vrt::if_hdr_unpack_$(suffix)( if_packet_info.has_tsf = true; if_packet_info.tsf = boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 32; #set $num_header_words += 1 - if_packet_info.tsf |= boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 0; + if_packet_info.tsf |= $(XE_MACRO)(packet_buff[$num_header_words]); #set $num_header_words += 1 #else if_packet_info.has_tsf = false; @@ -239,4 +244,12 @@ def parse_tmpl(_tmpl_text, **kwargs): if __name__ == '__main__': import sys - open(sys.argv[1], 'w').write(parse_tmpl(TMPL_TEXT, file=__file__)) + open(sys.argv[1], 'w').write(parse_tmpl( + TMPL_TEXT, + file=__file__, + sid_p = 0b00001, + cid_p = 0b00010, + tsi_p = 0b00100, + tsf_p = 0b01000, + tlr_p = 0b10000, + )) diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 311a8953b..ca37f351f 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -23,7 +23,6 @@ #include <uhd/utils/assert.hpp> #include <boost/foreach.hpp> #include <boost/thread.hpp> -#include <boost/enable_shared_from_this.hpp> #include <vector> #include <iostream> @@ -99,8 +98,7 @@ private: bool _input; //! hold a bounded buffer of completed transfers - typedef bounded_buffer<libusb_transfer *> lut_buff_type; - lut_buff_type::sptr _completed_list; + bounded_buffer<libusb_transfer *> _completed_list; //! a list of all transfer structs we allocated std::vector<libusb_transfer *> _all_luts; @@ -134,7 +132,7 @@ static void callback(libusb_transfer *lut){ * \param pointer to libusb_transfer */ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ - _completed_list->push_with_wait(lut); + _completed_list.push_with_wait(lut); } @@ -153,9 +151,9 @@ usb_endpoint::usb_endpoint( ): _handle(handle), _endpoint(endpoint), - _input(input) + _input(input), + _completed_list(num_transfers) { - _completed_list = lut_buff_type::make(num_transfers); _buffer_pool = buffer_pool::make(num_transfers, transfer_size); for (size_t i = 0; i < num_transfers; i++){ _all_luts.push_back(allocate_transfer(_buffer_pool->at(i), transfer_size)); @@ -163,7 +161,7 @@ usb_endpoint::usb_endpoint( //input luts are immediately submitted to be filled //output luts go into the completed list as free buffers if (_input) this->submit(_all_luts.back()); - else _completed_list->push_with_wait(_all_luts.back()); + else _completed_list.push_with_wait(_all_luts.back()); } } @@ -272,15 +270,15 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut){ libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw - libusb_transfer *lut; - if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut; + libusb_transfer *lut = NULL; + if (_completed_list.pop_with_timed_wait(lut, timeout)) return lut; return NULL; } /*********************************************************************** * USB zero_copy device class **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> { +class libusb_zero_copy_impl : public usb_zero_copy{ public: libusb_zero_copy_impl( @@ -400,8 +398,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ } else { return managed_recv_buffer::make_safe( - boost::asio::const_buffer(lut->buffer, lut->actual_length), - boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) + lut->buffer, lut->actual_length, + boost::bind(&libusb_zero_copy_impl::release, this, lut) ); } } @@ -420,8 +418,8 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ } else { return managed_send_buffer::make_safe( - boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()), - boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) + lut->buffer, this->get_send_frame_size(), + boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1) ); } } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 697e172cd..87c5ec823 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -19,38 +19,99 @@ #include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/thread_priority.hpp> #include <uhd/utils/assert.hpp> #include <uhd/utils/warning.hpp> #include <boost/asio.hpp> #include <boost/format.hpp> -#include <boost/thread/thread.hpp> -#include <boost/enable_shared_from_this.hpp> #include <iostream> +#include <vector> using namespace uhd; using namespace uhd::transport; namespace asio = boost::asio; -//Define this to the the boost async io calls to perform receive. -//Otherwise, get_recv_buff uses a blocking receive with timeout. -#define USE_ASIO_ASYNC_RECV +//A reasonable number of frames for send/recv and async/sync +static const size_t DEFAULT_NUM_FRAMES = 32; -//Define this to the the boost async io calls to perform send. -//Otherwise, the commit callback uses a blocking send. -//#define USE_ASIO_ASYNC_SEND +/*********************************************************************** + * Reusable managed receiver buffer: + * - Initialize with memory and a release callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +public: + typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr; + typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type; -//The number of service threads to spawn for async ASIO: -//A single concurrent thread for io_service seems to be the fastest. -//Threads are disabled when no async implementations are enabled. -#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND) -static const size_t CONCURRENCY_HINT = 1; -#else -static const size_t CONCURRENCY_HINT = 0; -#endif + udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): + _mem(mem), _release_cb(release_cb){/* NOP */} -//A reasonable number of frames for send/recv and async/sync -static const size_t DEFAULT_NUM_FRAMES = 32; + void release(void){ + if (_expired) return; + this->_release_cb(this); + _expired = true; + } + + sptr get_new(size_t len){ + _expired = false; + _len = len; + return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter); + } + + template <class T> T cast(void) const{return static_cast<T>(_mem);} + +private: + static void fake_deleter(void *obj){ + static_cast<udp_zero_copy_asio_mrb *>(obj)->release(); + } + + const void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + bool _expired; + void *_mem; + size_t _len; + release_cb_type _release_cb; +}; + +/*********************************************************************** + * Reusable managed send buffer: + * - Initialize with memory and a commit callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_msb : public managed_send_buffer{ +public: + typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr; + typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type; + + udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): + _mem(mem), _commit_cb(commit_cb){/* NOP */} + + void commit(size_t len){ + if (_expired) return; + this->_commit_cb(this, len); + _expired = true; + } + + sptr get_new(size_t len){ + _expired = false; + _len = len; + return sptr(this, &udp_zero_copy_asio_msb::fake_deleter); + } + +private: + static void fake_deleter(void *obj){ + static_cast<udp_zero_copy_asio_msb *>(obj)->commit(0); + } + + void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + bool _expired; + void *_mem; + size_t _len; + commit_cb_type _commit_cb; +}; /*********************************************************************** * Zero Copy UDP implementation with ASIO: @@ -59,7 +120,7 @@ static const size_t DEFAULT_NUM_FRAMES = 32; * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> { +class udp_zero_copy_asio_impl : public udp_zero_copy{ public: typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; @@ -72,8 +133,9 @@ public: _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))), _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))), _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), - _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)), - _io_service(_concurrency_hint) + _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), + _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), + _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames) { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -87,39 +149,28 @@ public: _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); _sock_fd = _socket->native(); - } - - ~udp_zero_copy_asio_impl(void){ - delete _work; //allow io_service run to complete - _thread_group.join_all(); //wait for service threads to exit - delete _socket; - } - void init(void){ - //allocate all recv frames and release them to begin xfers - _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); - _recv_buffer_pool = buffer_pool::make(_num_recv_frames, _recv_frame_size); - for (size_t i = 0; i < _num_recv_frames; i++){ - release(_recv_buffer_pool->at(i)); + //allocate re-usable managed receive buffers + for (size_t i = 0; i < get_num_recv_frames(); i++){ + _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr( + new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), + boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) + )); + handle_recv(_mrb_pool.back().get()); } - //allocate all send frames and push them into the fifo - _pending_send_buffs = pending_buffs_type::make(_num_send_frames); - _send_buffer_pool = buffer_pool::make(_num_send_frames, _send_frame_size); - for (size_t i = 0; i < _num_send_frames; i++){ - handle_send(_send_buffer_pool->at(i)); + //allocate re-usable managed send buffers + for (size_t i = 0; i < get_num_send_frames(); i++){ + _msb_pool.push_back(udp_zero_copy_asio_msb::sptr( + new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), + boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) + )); + handle_send(_msb_pool.back().get()); } - - //spawn the service threads that will run the io service - _work = new asio::io_service::work(_io_service); //new work to delete later - for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread( - boost::bind(&udp_zero_copy_asio_impl::service, this) - ); } - void service(void){ - set_thread_priority_safe(); - _io_service.run(); + ~udp_zero_copy_asio_impl(void){ + delete _socket; } //get size for internal socket buffer @@ -136,50 +187,19 @@ public: return get_buff_size<Opt>(); } - //! handle a recv callback -> push the filled memory into the fifo - UHD_INLINE void handle_recv(void *mem, size_t len){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); - } - - //////////////////////////////////////////////////////////////////// - #ifdef USE_ASIO_ASYNC_RECV - //////////////////////////////////////////////////////////////////// - //! pop a filled recv buffer off of the fifo and bind with the release callback - managed_recv_buffer::sptr get_recv_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - asio::mutable_buffer buff; - if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ - return managed_recv_buffer::make_safe( - buff, boost::bind( - &udp_zero_copy_asio_impl::release, - shared_from_this(), - asio::buffer_cast<void*>(buff) - ) - ); - } - return managed_recv_buffer::sptr(); - } - - //! release a recv buffer -> start an async recv on the buffer - void release(void *mem){ - _socket->async_receive( - boost::asio::buffer(mem, this->get_recv_frame_size()), - boost::bind( - &udp_zero_copy_asio_impl::handle_recv, - shared_from_this(), mem, - asio::placeholders::bytes_transferred - ) - ); - } - - //////////////////////////////////////////////////////////////////// - #else /*USE_ASIO_ASYNC_RECV*/ - //////////////////////////////////////////////////////////////////// - managed_recv_buffer::sptr get_recv_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - asio::mutable_buffer buff; - + /******************************************************************* + * Receive implementation: + * + * Use select to perform a blocking receive with timeout. + * Return the managed receive buffer with the new length. + * When the caller is finished with the managed buffer, + * the managed receive buffer is released back into the queue. + * + * Assumptions: + * - A managed buffer is always available. + * - The queue can never be over-filled. + ******************************************************************/ + UHD_INLINE bool is_recv_socket_ready(double timeout){ //setup timeval for timeout timeval tv; tv.tv_sec = 0; @@ -190,104 +210,74 @@ public: FD_ZERO(&rset); FD_SET(_sock_fd, &rset); - //call select to perform timed wait and grab an available buffer with wait - //if the condition is true, call receive and return the managed buffer - if ( - ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and - _pending_recv_buffs->pop_with_timed_wait(buff, timeout) - ){ - return managed_recv_buffer::make_safe( - asio::buffer( - boost::asio::buffer_cast<void *>(buff), - _socket->receive(asio::buffer(buff)) - ), - boost::bind( - &udp_zero_copy_asio_impl::release, - shared_from_this(), - asio::buffer_cast<void*>(buff) - ) - ); + //call select with timeout on receive socket + return ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0; + } + + managed_recv_buffer::sptr get_recv_buff(double timeout){ + udp_zero_copy_asio_mrb *mrb = NULL; + if (is_recv_socket_ready(timeout) and _pending_recv_buffs.pop_with_haste(mrb)){ + return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0)); } return managed_recv_buffer::sptr(); } - void release(void *mem){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - handle_recv(mem, this->get_recv_frame_size()); + UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ + _pending_recv_buffs.push_with_pop_on_full(mrb); } - //////////////////////////////////////////////////////////////////// - #endif /*USE_ASIO_ASYNC_RECV*/ - //////////////////////////////////////////////////////////////////// + void release(udp_zero_copy_asio_mrb *mrb){ + handle_recv(mrb); + } size_t get_num_recv_frames(void) const {return _num_recv_frames;} size_t get_recv_frame_size(void) const {return _recv_frame_size;} - //! handle a send callback -> push the emptied memory into the fifo - UHD_INLINE void handle_send(void *mem){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size())); - } - - //! pop an empty send buffer off of the fifo and bind with the commit callback - managed_send_buffer::sptr get_send_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - asio::mutable_buffer buff; - if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ - return managed_send_buffer::make_safe( - buff, boost::bind( - &udp_zero_copy_asio_impl::commit, - shared_from_this(), - asio::buffer_cast<void*>(buff), _1 - ) - ); + /******************************************************************* + * Send implementation: + * + * Get a managed receive buffer immediately with max length set. + * The caller will fill the buffer and commit it when finished. + * The commit routine will perform a blocking send operation, + * and push the managed send buffer back into the queue. + * + * Assumptions: + * - A managed buffer is always available. + * - The queue can never be over-filled. + ******************************************************************/ + managed_send_buffer::sptr get_send_buff(double){ + udp_zero_copy_asio_msb *msb = NULL; + if (_pending_send_buffs.pop_with_haste(msb)){ + return msb->get_new(_send_frame_size); } return managed_send_buffer::sptr(); } - //////////////////////////////////////////////////////////////////// - #ifdef USE_ASIO_ASYNC_SEND - //////////////////////////////////////////////////////////////////// - //! commit a send buffer -> start an async send on the buffer - void commit(void *mem, size_t len){ - _socket->async_send( - boost::asio::buffer(mem, len), - boost::bind( - &udp_zero_copy_asio_impl::handle_send, - shared_from_this(), mem - ) - ); + UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ + _pending_send_buffs.push_with_pop_on_full(msb); } - //////////////////////////////////////////////////////////////////// - #else /*USE_ASIO_ASYNC_SEND*/ - //////////////////////////////////////////////////////////////////// - void commit(void *mem, size_t len){ - _socket->send(asio::buffer(mem, len)); - handle_send(mem); + void commit(udp_zero_copy_asio_msb *msb, size_t len){ + ::send(_sock_fd, msb->cast<const char *>(), len, 0); + handle_send(msb); } - //////////////////////////////////////////////////////////////////// - #endif /*USE_ASIO_ASYNC_SEND*/ - //////////////////////////////////////////////////////////////////// - size_t get_num_send_frames(void) const {return _num_send_frames;} size_t get_send_frame_size(void) const {return _send_frame_size;} private: //memory management -> buffers and fifos - boost::thread_group _thread_group; - buffer_pool::sptr _send_buffer_pool, _recv_buffer_pool; - typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type; - pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; + buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; + bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; + bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; + std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool; + std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool; //asio guts -> socket and service - size_t _concurrency_hint; asio::io_service _io_service; asio::ip::udp::socket *_socket; - asio::io_service::work *_work; int _sock_fd; }; @@ -340,7 +330,5 @@ udp_zero_copy::sptr udp_zero_copy::make( resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send"); - udp_trans->init(); //buffers resized -> call init() to use - return udp_trans; } diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index c535edd04..795d5bc62 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -67,13 +67,17 @@ template <typename T> UHD_INLINE T get_context_code( std::vector<const boost::uint8_t *> copy_buffs; size_t size_of_copy_buffs; size_t fragment_offset_in_samps; + std::vector<void *> io_buffs; + uhd::convert::input_type otw_buffs; recv_state(size_t width = 1): width(width), managed_buffs(width), copy_buffs(width, NULL), size_of_copy_buffs(0), - fragment_offset_in_samps(0) + fragment_offset_in_samps(0), + io_buffs(0), //resized later + otw_buffs(1) //always 1 for now { /* NOP */ } @@ -144,7 +148,7 @@ template <typename T> UHD_INLINE T get_context_code( ******************************************************************/ static UHD_INLINE size_t _recv1( recv_state &state, - const std::vector<void *> &buffs, + const uhd::device::recv_buffs_type &buffs, size_t offset_bytes, size_t total_samps, uhd::rx_metadata_t &metadata, @@ -192,17 +196,16 @@ template <typename T> UHD_INLINE T get_context_code( size_t bytes_to_copy = nsamps_to_copy*bytes_per_item; size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/chans_per_otw_buff; - std::vector<void *> io_buffs(chans_per_otw_buff); - for (size_t i = 0; i < state.width; i+=chans_per_otw_buff){ + for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ //fill a vector with pointers to the io buffers for (size_t j = 0; j < chans_per_otw_buff; j++){ - io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes; + state.io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes; } //copy-convert the samples from the recv buffer - uhd::convert::input_type otw_buffs(1, state.copy_buffs[i]); - converter(otw_buffs, io_buffs, nsamps_to_copy_per_io_buff); + state.otw_buffs[0] = state.copy_buffs[i]; + converter(state.otw_buffs, state.io_buffs, nsamps_to_copy_per_io_buff); //update the rx copy buffer to reflect the bytes copied state.copy_buffs[i] += bytes_to_copy; @@ -223,7 +226,7 @@ template <typename T> UHD_INLINE T get_context_code( ******************************************************************/ static UHD_INLINE size_t recv( recv_state &state, - const std::vector<void *> &buffs, + const uhd::device::recv_buffs_type &buffs, const size_t total_num_samps, uhd::rx_metadata_t &metadata, uhd::device::recv_mode_t recv_mode, @@ -236,6 +239,8 @@ template <typename T> UHD_INLINE T get_context_code( size_t vrt_header_offset_words32 = 0, size_t chans_per_otw_buff = 1 ){ + state.io_buffs.resize(chans_per_otw_buff); + uhd::convert::function_type converter( uhd::convert::get_converter_otw_to_cpu( io_type, otw_type, 1, chans_per_otw_buff @@ -300,8 +305,20 @@ template <typename T> UHD_INLINE T get_context_code( struct send_state{ //init the expected seq number size_t next_packet_seq; - - send_state(void) : next_packet_seq(0){ + managed_send_buffs_t managed_buffs; + const boost::uint64_t zeros; + std::vector<const void *> zero_buffs; + std::vector<const void *> io_buffs; + uhd::convert::output_type otw_buffs; + + send_state(size_t width = 1): + next_packet_seq(0), + managed_buffs(width), + zeros(0), + zero_buffs(width, &zeros), + io_buffs(0), //resized later + otw_buffs(1) //always 1 for now + { /* NOP */ } }; @@ -312,7 +329,7 @@ template <typename T> UHD_INLINE T get_context_code( ******************************************************************/ static UHD_INLINE size_t _send1( send_state &state, - const std::vector<const void *> &buffs, + const uhd::device::send_buffs_type &buffs, const size_t offset_bytes, const size_t num_samps, uhd::transport::vrt::if_packet_info_t &if_packet_info, @@ -326,29 +343,27 @@ template <typename T> UHD_INLINE T get_context_code( if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*OTW_BYTES_PER_SAMP)/sizeof(boost::uint32_t); if_packet_info.packet_count = state.next_packet_seq; - //get send buffers for each channel - managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); - if (not get_send_buffs(send_buffs)) return 0; + //get send buffers for each otw channel + if (not get_send_buffs(state.managed_buffs)) return 0; - std::vector<const void *> io_buffs(chans_per_otw_buff); for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ //calculate pointers with offsets to io and otw memory for (size_t j = 0; j < chans_per_otw_buff; j++){ - io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes; + state.io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes; } - boost::uint32_t *otw_mem = send_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32; + boost::uint32_t *otw_mem = state.managed_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32; //pack metadata into a vrt header vrt_packer(otw_mem, if_packet_info); otw_mem += if_packet_info.num_header_words32; //copy-convert the samples into the send buffer - uhd::convert::output_type otw_buffs(1, otw_mem); - converter(io_buffs, otw_buffs, num_samps); + state.otw_buffs[0] = otw_mem; + converter(state.io_buffs, state.otw_buffs, num_samps); //commit the samples to the zero-copy interface size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); - send_buffs[i]->commit(num_bytes_total); + state.managed_buffs[i]->commit(num_bytes_total); } state.next_packet_seq++; //increment sequence after commits return num_samps; @@ -359,7 +374,7 @@ template <typename T> UHD_INLINE T get_context_code( ******************************************************************/ static UHD_INLINE size_t send( send_state &state, - const std::vector<const void *> &buffs, + const uhd::device::send_buffs_type &buffs, const size_t total_num_samps, const uhd::tx_metadata_t &metadata, uhd::device::send_mode_t send_mode, @@ -372,6 +387,8 @@ template <typename T> UHD_INLINE T get_context_code( size_t vrt_header_offset_words32 = 0, size_t chans_per_otw_buff = 1 ){ + state.io_buffs.resize(chans_per_otw_buff); + uhd::convert::function_type converter( uhd::convert::get_converter_cpu_to_otw( io_type, otw_type, chans_per_otw_buff, 1 @@ -398,19 +415,11 @@ template <typename T> UHD_INLINE T get_context_code( if_packet_info.sob = metadata.start_of_burst; if_packet_info.eob = metadata.end_of_burst; - //TODO remove this code when sample counts of zero are supported by hardware - std::vector<const void *> buffs_(buffs); - size_t total_num_samps_(total_num_samps); - if (total_num_samps == 0){ - static const boost::uint64_t zeros = 0; //max size of a host sample - buffs_ = std::vector<const void *>(buffs.size(), &zeros); - total_num_samps_ = 1; - } - return _send1( state, - buffs_, 0, - std::min(total_num_samps_, max_samples_per_packet), + //TODO remove this code when sample counts of zero are supported by hardware + (total_num_samps)?buffs : state.zero_buffs, 0, + std::max<size_t>(1, std::min(total_num_samps, max_samples_per_packet)), if_packet_info, converter, vrt_packer, diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp index a5a864a04..b91eaae1d 100644 --- a/host/lib/transport/zero_copy.cpp +++ b/host/lib/transport/zero_copy.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -29,10 +29,9 @@ static void release_nop(void){ class safe_managed_receive_buffer : public managed_recv_buffer{ public: safe_managed_receive_buffer( - const boost::asio::const_buffer &buff, - const release_fcn_t &release_fcn + const void *buff, size_t size, const release_fcn_t &release_fcn ): - _buff(buff), _release_fcn(release_fcn) + _buff(buff), _size(size), _release_fcn(release_fcn) { /* NOP */ } @@ -48,19 +47,23 @@ public: } private: - const boost::asio::const_buffer &get(void) const{ + const void *get_buff(void) const{ return _buff; } - const boost::asio::const_buffer _buff; + size_t get_size(void) const{ + return _size; + } + + const void *_buff; + size_t _size; release_fcn_t _release_fcn; }; managed_recv_buffer::sptr managed_recv_buffer::make_safe( - const boost::asio::const_buffer &buff, - const release_fcn_t &release_fcn + const void *buff, size_t size, const release_fcn_t &release_fcn ){ - return sptr(new safe_managed_receive_buffer(buff, release_fcn)); + return sptr(new safe_managed_receive_buffer(buff, size, release_fcn)); } /*********************************************************************** @@ -73,10 +76,9 @@ static void commit_nop(size_t){ class safe_managed_send_buffer : public managed_send_buffer{ public: safe_managed_send_buffer( - const boost::asio::mutable_buffer &buff, - const commit_fcn_t &commit_fcn + void *buff, size_t size, const commit_fcn_t &commit_fcn ): - _buff(buff), _commit_fcn(commit_fcn) + _buff(buff), _size(size), _commit_fcn(commit_fcn) { /* NOP */ } @@ -92,17 +94,21 @@ public: } private: - const boost::asio::mutable_buffer &get(void) const{ + void *get_buff(void) const{ return _buff; } - const boost::asio::mutable_buffer _buff; + size_t get_size(void) const{ + return _size; + } + + void *_buff; + size_t _size; commit_fcn_t _commit_fcn; }; safe_managed_send_buffer::sptr managed_send_buffer::make_safe( - const boost::asio::mutable_buffer &buff, - const commit_fcn_t &commit_fcn + void *buff, size_t size, const commit_fcn_t &commit_fcn ){ - return sptr(new safe_managed_send_buffer(buff, commit_fcn)); + return sptr(new safe_managed_send_buffer(buff, size, commit_fcn)); } diff --git a/host/lib/types/CMakeLists.txt b/host/lib/types/CMakeLists.txt index dfb7cf903..ad625111e 100644 --- a/host/lib/types/CMakeLists.txt +++ b/host/lib/types/CMakeLists.txt @@ -16,6 +16,62 @@ # ######################################################################## +# Setup defines for high resolution timing +######################################################################## +MESSAGE(STATUS "") +MESSAGE(STATUS "Configuring high resolution timing...") +INCLUDE(CheckCXXSourceCompiles) + +SET(CMAKE_REQUIRED_LIBRARIES -lrt) +CHECK_CXX_SOURCE_COMPILES(" + #include <ctime> + int main(){ + timespec ts; + return clock_gettime(CLOCK_MONOTONIC, &ts); + } + " HAVE_CLOCK_GETTIME +) +UNSET(CMAKE_REQUIRED_LIBRARIES) + +INCLUDE(CheckCXXSourceCompiles) +CHECK_CXX_SOURCE_COMPILES(" + #include <mach/mach_time.h> + int main(){ + mach_timebase_info_data_t info; + mach_timebase_info(&info); + mach_absolute_time(); + return 0; + } + " HAVE_MACH_ABSOLUTE_TIME +) + +CHECK_CXX_SOURCE_COMPILES(" + #include <Windows.h> + int main(){ + LARGE_INTEGER value; + QueryPerformanceCounter(&value); + QueryPerformanceFrequency(&value); + return 0; + } + " HAVE_QUERY_PERFORMANCE_COUNTER +) + +IF(HAVE_CLOCK_GETTIME) + MESSAGE(STATUS " High resolution timing supported through clock_gettime.") + ADD_DEFINITIONS(-DTIME_SPEC_USE_CLOCK_GETTIME) + SET(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -lrt") +ELSEIF(HAVE_MACH_ABSOLUTE_TIME) + MESSAGE(STATUS " High resolution timing supported through mach_absolute_time.") + ADD_DEFINITIONS(-DTIME_SPEC_USE_MACH_ABSOLUTE_TIME) +ELSEIF(HAVE_QUERY_PERFORMANCE_COUNTER) + MESSAGE(STATUS " High resolution timing supported through QueryPerformanceCounter.") + ADD_DEFINITIONS(-DTIME_SPEC_USE_QUERY_PERFORMANCE_COUNTER) +ELSE() + MESSAGE(STATUS " High resolution timing supported though microsec_clock.") + ADD_DEFINITIONS(-DTIME_SPEC_USE_MICROSEC_CLOCK) +ENDIF() + +######################################################################## # This file included, use CMake directory variables ######################################################################## LIBUHD_APPEND_SOURCES( diff --git a/host/lib/types/time_spec.cpp b/host/lib/types/time_spec.cpp index f39625a11..4a41f0fb9 100644 --- a/host/lib/types/time_spec.cpp +++ b/host/lib/types/time_spec.cpp @@ -19,6 +19,70 @@ #include <boost/math/special_functions/round.hpp> using namespace uhd; + +/*********************************************************************** + * Time spec system time + **********************************************************************/ + +/*! + * Creates a time spec from system counts: + * TODO make part of API as a static factory function + * The counts type is 64 bits and will overflow the ticks type of long. + * Therefore, divmod the counts into seconds + sub-second counts first. + */ +#include <inttypes.h> //imaxdiv, intmax_t +static UHD_INLINE time_spec_t time_spec_t_from_counts(intmax_t counts, intmax_t freq){ + imaxdiv_t divres = imaxdiv(counts, freq); + return time_spec_t(time_t(divres.quot), double(divres.rem)/freq); +} + +#ifdef TIME_SPEC_USE_CLOCK_GETTIME +#include <time.h> +time_spec_t time_spec_t::get_system_time(void){ + timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); + return time_spec_t(ts.tv_sec, ts.tv_nsec, 1e9); +} +#endif /* TIME_SPEC_USE_CLOCK_GETTIME */ + + +#ifdef TIME_SPEC_USE_MACH_ABSOLUTE_TIME +#include <mach/mach_time.h> +time_spec_t time_spec_t::get_system_time(void){ + mach_timebase_info_data_t info; mach_timebase_info(&info); + intmax_t nanosecs = mach_absolute_time()*info.numer/info.denom; + return time_spec_t_from_counts(nanosecs, intmax_t(1e9)); +} +#endif /* TIME_SPEC_USE_MACH_ABSOLUTE_TIME */ + + +#ifdef TIME_SPEC_USE_QUERY_PERFORMANCE_COUNTER +#include <Windows.h> +time_spec_t time_spec_t::get_system_time(void){ + LARGE_INTEGER counts, freq; + QueryPerformanceCounter(&counts); + QueryPerformanceFrequency(&freq); + return time_spec_t_from_counts(counts.QuadPart, freq.QuadPart); +} +#endif /* TIME_SPEC_USE_QUERY_PERFORMANCE_COUNTER */ + + +#ifdef TIME_SPEC_USE_MICROSEC_CLOCK +#include <boost/date_time/posix_time/posix_time.hpp> +namespace pt = boost::posix_time; +time_spec_t time_spec_t::get_system_time(void){ + pt::ptime time_now = pt::microsec_clock::universal_time(); + pt::time_duration time_dur = time_now - pt::from_time_t(0); + return time_spec_t( + time_t(time_dur.total_seconds()), + long(time_dur.fractional_seconds()), + double(pt::time_duration::ticks_per_second()) + ); +} +#endif /* TIME_SPEC_USE_MICROSEC_CLOCK */ + +/*********************************************************************** + * Time spec constructors + **********************************************************************/ time_spec_t::time_spec_t(double secs): _full_secs(0), _frac_secs(secs) @@ -35,11 +99,14 @@ time_spec_t::time_spec_t(time_t full_secs, double frac_secs): time_spec_t::time_spec_t(time_t full_secs, long tick_count, double tick_rate): _full_secs(full_secs), - _frac_secs(double(tick_count)/tick_rate) + _frac_secs(tick_count/tick_rate) { /* NOP */ } +/*********************************************************************** + * Time spec accessors + **********************************************************************/ long time_spec_t::get_tick_count(double tick_rate) const{ return boost::math::iround(this->get_frac_secs()*tick_rate); } @@ -49,15 +116,16 @@ double time_spec_t::get_real_secs(void) const{ } time_t time_spec_t::get_full_secs(void) const{ - double intpart; - std::modf(this->_frac_secs, &intpart); - return this->_full_secs + time_t(intpart); + return this->_full_secs + time_t(this->_frac_secs); } double time_spec_t::get_frac_secs(void) const{ - return std::fmod(this->_frac_secs, 1.0); + return this->_frac_secs - time_t(this->_frac_secs); } +/*********************************************************************** + * Time spec math overloads + **********************************************************************/ time_spec_t &time_spec_t::operator+=(const time_spec_t &rhs){ this->_full_secs += rhs.get_full_secs(); this->_frac_secs += rhs.get_frac_secs(); diff --git a/host/lib/types/types.cpp b/host/lib/types/types.cpp index 34d5947eb..c1be2ff6d 100644 --- a/host/lib/types/types.cpp +++ b/host/lib/types/types.cpp @@ -68,6 +68,7 @@ otw_type_t::otw_type_t(void): **********************************************************************/ static size_t tid_to_size(io_type_t::tid_t tid){ switch(tid){ + case io_type_t::COMPLEX_FLOAT64: return sizeof(std::complex<double>); case io_type_t::COMPLEX_FLOAT32: return sizeof(std::complex<float>); case io_type_t::COMPLEX_INT16: return sizeof(std::complex<boost::int16_t>); case io_type_t::COMPLEX_INT8: return sizeof(std::complex<boost::int8_t>); diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index 9fa1b4f72..88cbab073 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -91,7 +91,6 @@ struct usrp1_impl::io_impl{ void commit_send_buff(offset_send_buffer::sptr, offset_send_buffer::sptr, size_t); void flush_send_buff(void); bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &, double); - bool transmitting_enb; }; /*! @@ -133,6 +132,9 @@ void usrp1_impl::io_impl::flush_send_buff(void){ //calculate the number of bytes to alignment size_t bytes_to_pad = (-1*curr_buff->offset)%alignment_padding; + //send at least alignment_padding to guarantee zeros are sent + if (bytes_to_pad == 0) bytes_to_pad = alignment_padding; + //get the buffer, clear, and commit (really current buffer) vrt_packet_handler::managed_send_buffs_t buffs(1); if (this->get_send_buffs(buffs, 0.1)){ @@ -157,10 +159,8 @@ bool usrp1_impl::io_impl::get_send_buffs( //calculate the buffer pointer and size given the offset //references to the buffers are held in the bound function buffs[0] = managed_send_buffer::make_safe( - boost::asio::buffer( - curr_buff->buff->cast<char *>() + curr_buff->offset, - curr_buff->buff->size() - curr_buff->offset - ), + curr_buff->buff->cast<char *>() + curr_buff->offset, + curr_buff->buff->size() - curr_buff->offset, boost::bind(&usrp1_impl::io_impl::commit_send_buff, this, curr_buff, next_buff, _1) ); @@ -190,7 +190,7 @@ void usrp1_impl::io_init(void){ ); rx_stream_on_off(false); - tx_stream_on_off(false); + _io_impl->flush_send_buff(); } void usrp1_impl::rx_stream_on_off(bool enb){ @@ -201,13 +201,6 @@ void usrp1_impl::rx_stream_on_off(bool enb){ } } -void usrp1_impl::tx_stream_on_off(bool enb){ - if (not enb) _io_impl->flush_send_buff(); - _codec_ctrls[DBOARD_SLOT_A]->enable_tx_digital(enb); - _codec_ctrls[DBOARD_SLOT_B]->enable_tx_digital(enb); - _io_impl->transmitting_enb = enb; -} - /*********************************************************************** * Data send + helper functions **********************************************************************/ @@ -227,12 +220,11 @@ size_t usrp1_impl::get_max_send_samps_per_packet(void) const { } size_t usrp1_impl::send( - const std::vector<const void *> &buffs, size_t num_samps, + const send_buffs_type &buffs, size_t num_samps, const tx_metadata_t &metadata, const io_type_t &io_type, send_mode_t send_mode, double timeout ){ if (_soft_time_ctrl->send_pre(metadata, timeout)) return num_samps; - if (not _io_impl->transmitting_enb) tx_stream_on_off(true); size_t num_samps_sent = vrt_packet_handler::send( _io_impl->packet_handler_send_state, //last state of the send handler @@ -250,7 +242,7 @@ size_t usrp1_impl::send( //handle eob flag (commit the buffer, disable the DACs) //check num samps sent to avoid flush on incomplete/timeout if (metadata.end_of_burst and num_samps_sent == num_samps){ - this->tx_stream_on_off(false); + _io_impl->flush_send_buff(); } //handle the polling for underflow conditions @@ -306,7 +298,7 @@ size_t usrp1_impl::get_max_recv_samps_per_packet(void) const { } size_t usrp1_impl::recv( - const std::vector<void *> &buffs, size_t num_samps, + const recv_buffs_type &buffs, size_t num_samps, rx_metadata_t &metadata, const io_type_t &io_type, recv_mode_t recv_mode, double timeout ){ diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index c91ecc7ed..e1b671811 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -18,7 +18,8 @@ #include "soft_time_ctrl.hpp" #include <uhd/transport/bounded_buffer.hpp> #include <boost/any.hpp> -#include <boost/thread.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/condition_variable.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <iostream> @@ -27,30 +28,7 @@ using namespace uhd::usrp; using namespace uhd::transport; namespace pt = boost::posix_time; -static const time_spec_t TWIDDLE(0.0015); - -/*********************************************************************** - * Utility helper functions - **********************************************************************/ - -//TODO put these in time_spec_t (maybe useful) - -static const double time_dur_tps = double(pt::time_duration::ticks_per_second()); - -time_spec_t time_dur_to_time_spec(const pt::time_duration &time_dur){ - return time_spec_t( - time_dur.total_seconds(), - long(time_dur.fractional_seconds()), - time_dur_tps - ); -} - -pt::time_duration time_spec_to_time_dur(const time_spec_t &time_spec){ - return pt::time_duration( - 0, 0, long(time_spec.get_full_secs()), - time_spec.get_tick_count(time_dur_tps) - ); -} +static const time_spec_t TWIDDLE(0.0011); /*********************************************************************** * Soft time control implementation @@ -61,7 +39,7 @@ public: soft_time_ctrl_impl(const cb_fcn_type &stream_on_off): _nsamps_remaining(0), _stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS), - _cmd_queue(bounded_buffer<boost::any>::make(2)), + _cmd_queue(2), _stream_on_off(stream_on_off) { //synchronously spawn a new thread @@ -84,7 +62,7 @@ public: ******************************************************************/ void set_time(const time_spec_t &time){ boost::mutex::scoped_lock lock(_update_mutex); - _time_offset = boost::get_system_time() - time_spec_to_time_dur(time); + _time_offset = time_spec_t::get_system_time() - time; } time_spec_t get_time(void){ @@ -94,7 +72,7 @@ public: UHD_INLINE time_spec_t time_now(void){ //internal get time without scoped lock - return time_dur_to_time_spec(boost::get_system_time() - _time_offset); + return time_spec_t::get_system_time() - _time_offset; } UHD_INLINE void sleep_until_time( @@ -102,7 +80,8 @@ public: ){ boost::condition_variable cond; //use a condition variable to unlock, sleep, lock - cond.timed_wait(lock, _time_offset + time_spec_to_time_dur(time)); + double seconds_to_sleep = (time - time_now()).get_real_secs(); + cond.timed_wait(lock, pt::microseconds(long(seconds_to_sleep*1e6))); } /******************************************************************* @@ -133,7 +112,7 @@ public: } void issue_stream_cmd(const stream_cmd_t &cmd){ - _cmd_queue->push_with_wait(cmd); + _cmd_queue.push_with_wait(cmd); } void stream_on_off(bool enb){ @@ -201,7 +180,7 @@ public: try{ boost::any cmd; while (true){ - _cmd_queue->pop_with_wait(cmd); + _cmd_queue.pop_with_wait(cmd); recv_cmd_handle_cmd(boost::any_cast<stream_cmd_t>(cmd)); } } catch(const boost::thread_interrupted &){} @@ -211,8 +190,8 @@ private: boost::mutex _update_mutex; size_t _nsamps_remaining; stream_cmd_t::stream_mode_t _stream_mode; - pt::ptime _time_offset; - bounded_buffer<boost::any>::sptr _cmd_queue; + time_spec_t _time_offset; + bounded_buffer<boost::any> _cmd_queue; const cb_fcn_type _stream_on_off; boost::thread_group _thread_group; }; diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index 057725394..1d9f6709f 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -80,13 +80,13 @@ public: ~usrp1_impl(void); //the io interface - size_t send(const std::vector<const void *> &, + size_t send(const send_buffs_type &, size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, send_mode_t, double); - size_t recv(const std::vector<void *> &, + size_t recv(const recv_buffs_type &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, recv_mode_t, double); @@ -125,7 +125,6 @@ private: UHD_PIMPL_DECL(io_impl) _io_impl; void io_init(void); void rx_stream_on_off(bool); - void tx_stream_on_off(bool); void handle_overrun(size_t); //underrun and overrun poll intervals diff --git a/host/lib/usrp/usrp2/codec_impl.cpp b/host/lib/usrp/usrp2/codec_impl.cpp index d7078d985..09bec6db2 100644 --- a/host/lib/usrp/usrp2/codec_impl.cpp +++ b/host/lib/usrp/usrp2/codec_impl.cpp @@ -47,6 +47,12 @@ void usrp2_mboard_impl::codec_init(void){ boost::bind(&usrp2_mboard_impl::tx_codec_get, this, _1, _2), boost::bind(&usrp2_mboard_impl::tx_codec_set, this, _1, _2) ); + + //initialize gain names. keeps get_rx_gain() from getting a gain + //that hasn't been set yet. + BOOST_FOREACH(std::string key, codec_rx_gain_ranges.keys()) { + _codec_rx_gains[key] = codec_rx_gain_ranges[key].start(); + } } /*********************************************************************** diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 30eaecae2..d09ce1871 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -32,6 +32,18 @@ using namespace uhd; using namespace uhd::usrp; using namespace uhd::transport; namespace asio = boost::asio; +namespace pt = boost::posix_time; + +/*********************************************************************** + * helpers + **********************************************************************/ +static UHD_INLINE pt::time_duration to_time_dur(double timeout){ + return pt::microseconds(long(timeout*1e6)); +} + +static UHD_INLINE double from_time_dur(const pt::time_duration &time_dur){ + return 1e-6*time_dur.total_microseconds(); +} /*********************************************************************** * constants @@ -61,6 +73,7 @@ public: _last_seq_out = 0; _last_seq_ack = 0; _max_seqs_out = max_seqs_out; + _ready_fcn = boost::bind(&flow_control_monitor::ready, this); } /*! @@ -73,11 +86,8 @@ public: boost::this_thread::disable_interruption di; //disable because the wait can throw boost::unique_lock<boost::mutex> lock(_fc_mutex); _last_seq_out = seq; - return _fc_cond.timed_wait( - lock, - boost::posix_time::microseconds(long(timeout*1e6)), - boost::bind(&flow_control_monitor::ready, this) - ); + if (this->ready()) return true; + return _fc_cond.timed_wait(lock, to_time_dur(timeout), _ready_fcn); } /*! @@ -99,6 +109,7 @@ private: boost::mutex _fc_mutex; boost::condition _fc_cond; seq_type _last_seq_out, _last_seq_ack, _max_seqs_out; + boost::function<bool(void)> _ready_fcn; }; /*********************************************************************** @@ -110,11 +121,16 @@ private: **********************************************************************/ struct usrp2_impl::io_impl{ - io_impl(size_t send_frame_size, size_t width): - packet_handler_recv_state(width), - async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/)) + io_impl(size_t send_frame_size, const std::vector<zero_copy_if::sptr> &xports): + xports(xports), + packet_handler_recv_state(xports.size()), + packet_handler_send_state(xports.size()), + async_msg_fifo(100/*messages deep*/) { - for (size_t i = 0; i < width; i++){ + get_recv_buffs_fcn = boost::bind(&usrp2_impl::io_impl::get_recv_buffs, this, _1); + get_send_buffs_fcn = boost::bind(&usrp2_impl::io_impl::get_send_buffs, this, _1); + + for (size_t i = 0; i < xports.size(); i++){ fc_mons.push_back(flow_control_monitor::sptr( new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size) )); @@ -135,31 +151,32 @@ struct usrp2_impl::io_impl{ recv_pirate_crew.join_all(); } - bool get_send_buffs( - const std::vector<zero_copy_if::sptr> &trans, - vrt_packet_handler::managed_send_buffs_t &buffs, - double timeout - ){ - UHD_ASSERT_THROW(trans.size() == buffs.size()); + bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &buffs){ + UHD_ASSERT_THROW(xports.size() == buffs.size()); //calculate the flow control word const boost::uint32_t fc_word32 = packet_handler_send_state.next_packet_seq; //grab a managed buffer for each index for (size_t i = 0; i < buffs.size(); i++){ - if (not fc_mons[i]->check_fc_condition(fc_word32, timeout)) return false; - buffs[i] = trans[i]->get_send_buff(timeout); + if (not fc_mons[i]->check_fc_condition(fc_word32, send_timeout)) return false; + buffs[i] = xports[i]->get_send_buff(send_timeout); if (not buffs[i].get()) return false; buffs[i]->cast<boost::uint32_t *>()[0] = uhd::htonx(fc_word32); } return true; } - bool get_recv_buffs( - const std::vector<zero_copy_if::sptr> &xports, - vrt_packet_handler::managed_recv_buffs_t &buffs, - double timeout - ); + bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs); + + const std::vector<zero_copy_if::sptr> &xports; + + //timeouts set on calls to recv/send (passed into get buffs methods) + double recv_timeout, send_timeout; + + //bound callbacks for get buffs (bound once here, not in fast-path) + vrt_packet_handler::get_recv_buffs_t get_recv_buffs_fcn; + vrt_packet_handler::get_send_buffs_t get_send_buffs_fcn; //previous state for each buffer std::vector<vrt::if_packet_info_t> prev_infos; @@ -175,7 +192,7 @@ struct usrp2_impl::io_impl{ void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t); boost::thread_group recv_pirate_crew; bool recv_pirate_crew_raiding; - bounded_buffer<async_metadata_t>::sptr async_msg_fifo; + bounded_buffer<async_metadata_t> async_msg_fifo; boost::mutex spawn_mutex; }; @@ -228,7 +245,7 @@ void usrp2_impl::io_impl::recv_pirate_loop( //print the famous U, and push the metadata into the message queue if (metadata.event_code & underflow_flags) std::cerr << "U" << std::flush; //else std::cout << "metadata.event_code " << metadata.event_code << std::endl; - async_msg_fifo->push_with_pop_on_full(metadata); + async_msg_fifo.push_with_pop_on_full(metadata); } else{ //TODO unknown received packet, may want to print error... @@ -248,7 +265,7 @@ void usrp2_impl::io_init(void){ const size_t send_frame_size = _data_transports.front()->get_send_frame_size(); //create new io impl - _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports.size())); + _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports)); //create a new pirate thread for each zc if (yarr!!) for (size_t i = 0; i < _data_transports.size(); i++){ @@ -274,7 +291,7 @@ bool usrp2_impl::recv_async_msg( async_metadata_t &async_metadata, double timeout ){ boost::this_thread::disable_interruption di; //disable because the wait can throw - return _io_impl->async_msg_fifo->pop_with_timed_wait(async_metadata, timeout); + return _io_impl->async_msg_fifo.pop_with_timed_wait(async_metadata, timeout); } /*********************************************************************** @@ -291,10 +308,11 @@ size_t usrp2_impl::get_max_send_samps_per_packet(void) const{ } size_t usrp2_impl::send( - const std::vector<const void *> &buffs, size_t num_samps, + const send_buffs_type &buffs, size_t num_samps, const tx_metadata_t &metadata, const io_type_t &io_type, send_mode_t send_mode, double timeout ){ + _io_impl->send_timeout = timeout; return vrt_packet_handler::send( _io_impl->packet_handler_send_state, //last state of the send handler buffs, num_samps, //buffer to fill @@ -302,7 +320,7 @@ size_t usrp2_impl::send( io_type, _tx_otw_type, //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_pack_be, - boost::bind(&usrp2_impl::io_impl::get_send_buffs, _io_impl.get(), _data_transports, _1, timeout), + _io_impl->get_send_buffs_fcn, get_max_send_samps_per_packet(), vrt_send_header_offset_words32 ); @@ -311,14 +329,6 @@ size_t usrp2_impl::send( /*********************************************************************** * Alignment logic on receive **********************************************************************/ -static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ - return boost::posix_time::microseconds(long(timeout*1e6)); -} - -static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ - return 1e-6*time_dur.total_microseconds(); -} - static UHD_INLINE time_spec_t extract_time_spec( const vrt::if_packet_info_t &packet_info ){ @@ -360,12 +370,10 @@ static UHD_INLINE bool handle_msg_packet( } UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs( - const std::vector<zero_copy_if::sptr> &xports, - vrt_packet_handler::managed_recv_buffs_t &buffs, - double timeout + vrt_packet_handler::managed_recv_buffs_t &buffs ){ if (buffs.size() == 1){ - buffs[0] = xports[0]->get_recv_buff(timeout); + buffs[0] = xports[0]->get_recv_buff(recv_timeout); if (buffs[0].get() == NULL) return false; bool clear, msg; time_spec_t time; //unused variables //call extract_packet_info to handle printing the overflows @@ -373,7 +381,7 @@ UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs( return true; } //-------------------- begin alignment logic ---------------------// - boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout); + boost::system_time exit_time = boost::get_system_time() + to_time_dur(recv_timeout); managed_recv_buffer::sptr buff_tmp; std::list<size_t> _all_indexes, indexes_to_do; for (size_t i = 0; i < buffs.size(); i++) _all_indexes.push_back(i); @@ -454,10 +462,11 @@ static void handle_overflow(std::vector<usrp2_mboard_impl::sptr> &mboards, size_ } size_t usrp2_impl::recv( - const std::vector<void *> &buffs, size_t num_samps, + const recv_buffs_type &buffs, size_t num_samps, rx_metadata_t &metadata, const io_type_t &io_type, recv_mode_t recv_mode, double timeout ){ + _io_impl->recv_timeout = timeout; return vrt_packet_handler::recv( _io_impl->packet_handler_recv_state, //last state of the recv handler buffs, num_samps, //buffer to fill @@ -465,7 +474,7 @@ size_t usrp2_impl::recv( io_type, _rx_otw_type, //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_unpack_be, - boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _data_transports, _1, timeout), - boost::bind(&handle_overflow, _mboards, _1) + _io_impl->get_recv_buffs_fcn, + boost::bind(&handle_overflow, boost::ref(_mboards), _1) ); } diff --git a/host/lib/usrp/usrp2/mboard_impl.cpp b/host/lib/usrp/usrp2/mboard_impl.cpp index 784f662d9..397fae636 100644 --- a/host/lib/usrp/usrp2/mboard_impl.cpp +++ b/host/lib/usrp/usrp2/mboard_impl.cpp @@ -157,6 +157,14 @@ usrp2_mboard_impl::usrp2_mboard_impl( //set default subdev specs (*this)[MBOARD_PROP_RX_SUBDEV_SPEC] = subdev_spec_t(); (*this)[MBOARD_PROP_TX_SUBDEV_SPEC] = subdev_spec_t(); + + //This is a hack/fix for the lingering packet problem. + stream_cmd_t stream_cmd(stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE); + stream_cmd.num_samps = 1; + this->issue_ddc_stream_cmd(stream_cmd); + data_transport->get_recv_buff().get(); //recv with timeout for lingering + data_transport->get_recv_buff().get(); //recv with timeout for expected + _iface->poke32(_iface->regs.rx_ctrl_clear_overrun, 1); //resets sequence } usrp2_mboard_impl::~usrp2_mboard_impl(void){ diff --git a/host/lib/usrp/usrp2/usrp2_iface.cpp b/host/lib/usrp/usrp2/usrp2_iface.cpp index 149c5011f..4407a3011 100644 --- a/host/lib/usrp/usrp2/usrp2_iface.cpp +++ b/host/lib/usrp/usrp2/usrp2_iface.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -42,6 +42,7 @@ public: **********************************************************************/ usrp2_iface_impl(udp_simple::sptr ctrl_transport){ _ctrl_transport = ctrl_transport; + _ctrl_seq_num = 0; mb_eeprom = mboard_eeprom_t(*this, mboard_eeprom_t::MAP_N100); switch(this->get_rev()){ diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index ad95b2a4a..337f842d6 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -200,12 +200,12 @@ public: //the io interface size_t send( - const std::vector<const void *> &, size_t, + const send_buffs_type &, size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, uhd::device::send_mode_t, double ); size_t recv( - const std::vector<void *> &, size_t, + const recv_buffs_type &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, uhd::device::recv_mode_t, double ); diff --git a/host/lib/usrp/usrp_e100/io_impl.cpp b/host/lib/usrp/usrp_e100/io_impl.cpp index 58faeafb0..5fb2da7b8 100644 --- a/host/lib/usrp/usrp_e100/io_impl.cpp +++ b/host/lib/usrp/usrp_e100/io_impl.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -55,8 +55,8 @@ struct usrp_e100_impl::io_impl{ bool continuous_streaming; io_impl(usrp_e100_iface::sptr iface): data_xport(usrp_e100_make_mmap_zero_copy(iface)), - recv_pirate_booty(recv_booty_type::make(data_xport->get_num_recv_frames())), - async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/)) + recv_pirate_booty(data_xport->get_num_recv_frames()), + async_msg_fifo(100/*messages deep*/) { /* NOP */ } @@ -70,14 +70,13 @@ struct usrp_e100_impl::io_impl{ bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){ UHD_ASSERT_THROW(buffs.size() == 1); boost::this_thread::disable_interruption di; //disable because the wait can throw - return recv_pirate_booty->pop_with_timed_wait(buffs.front(), timeout); + return recv_pirate_booty.pop_with_timed_wait(buffs.front(), timeout); } //a pirate's life is the life for me! void recv_pirate_loop(usrp_e100_clock_ctrl::sptr); - typedef bounded_buffer<managed_recv_buffer::sptr> recv_booty_type; - recv_booty_type::sptr recv_pirate_booty; - bounded_buffer<async_metadata_t>::sptr async_msg_fifo; + bounded_buffer<managed_recv_buffer::sptr> recv_pirate_booty; + bounded_buffer<async_metadata_t> async_msg_fifo; boost::thread_group recv_pirate_crew; bool recv_pirate_crew_raiding; }; @@ -115,7 +114,7 @@ void usrp_e100_impl::io_impl::recv_pirate_loop(usrp_e100_clock_ctrl::sptr clock_ if (if_packet_info.sid == rx_data_inline_sid){ if (recv_debug) std::cout << "this is rx_data_inline_sid\n"; //same number of frames as the data transport -> always immediate - recv_pirate_booty->push_with_wait(buff); + recv_pirate_booty.push_with_wait(buff); continue; } @@ -134,7 +133,7 @@ void usrp_e100_impl::io_impl::recv_pirate_loop(usrp_e100_clock_ctrl::sptr clock_ //print the famous U, and push the metadata into the message queue if (metadata.event_code & underflow_flags) std::cerr << "U" << std::flush; - async_msg_fifo->push_with_pop_on_full(metadata); + async_msg_fifo.push_with_pop_on_full(metadata); continue; } @@ -224,7 +223,7 @@ size_t usrp_e100_impl::get_max_send_samps_per_packet(void) const{ } size_t usrp_e100_impl::send( - const std::vector<const void *> &buffs, size_t num_samps, + const send_buffs_type &buffs, size_t num_samps, const tx_metadata_t &metadata, const io_type_t &io_type, send_mode_t send_mode, double timeout ){ @@ -254,7 +253,7 @@ size_t usrp_e100_impl::get_max_recv_samps_per_packet(void) const{ } size_t usrp_e100_impl::recv( - const std::vector<void *> &buffs, size_t num_samps, + const recv_buffs_type &buffs, size_t num_samps, rx_metadata_t &metadata, const io_type_t &io_type, recv_mode_t recv_mode, double timeout ){ @@ -277,5 +276,5 @@ bool usrp_e100_impl::recv_async_msg( async_metadata_t &async_metadata, double timeout ){ boost::this_thread::disable_interruption di; //disable because the wait can throw - return _io_impl->async_msg_fifo->pop_with_timed_wait(async_metadata, timeout); + return _io_impl->async_msg_fifo.pop_with_timed_wait(async_metadata, timeout); } diff --git a/host/lib/usrp/usrp_e100/usrp_e100_impl.hpp b/host/lib/usrp/usrp_e100/usrp_e100_impl.hpp index 864e82099..897616320 100644 --- a/host/lib/usrp/usrp_e100/usrp_e100_impl.hpp +++ b/host/lib/usrp/usrp_e100/usrp_e100_impl.hpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -83,8 +83,8 @@ public: ~usrp_e100_impl(void); //the io interface - size_t send(const std::vector<const void *> &, size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, send_mode_t, double); - size_t recv(const std::vector<void *> &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, recv_mode_t, double); + size_t send(const send_buffs_type &, size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, send_mode_t, double); + size_t recv(const recv_buffs_type &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, recv_mode_t, double); bool recv_async_msg(uhd::async_metadata_t &, double); size_t get_max_send_samps_per_packet(void) const; size_t get_max_recv_samps_per_packet(void) const; diff --git a/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp b/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp index bf378a9b1..4e0137fdb 100644 --- a/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp +++ b/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -23,7 +23,6 @@ #include <unistd.h> //getpagesize #include <poll.h> //poll #include <boost/bind.hpp> -#include <boost/enable_shared_from_this.hpp> #include <iostream> using namespace uhd; @@ -36,7 +35,7 @@ static const size_t poll_breakout = 10; //how many poll timeouts constitute a fu /*********************************************************************** * The zero copy interface implementation **********************************************************************/ -class usrp_e100_mmap_zero_copy_impl : public zero_copy_if, public boost::enable_shared_from_this<usrp_e100_mmap_zero_copy_impl> { +class usrp_e100_mmap_zero_copy_impl : public zero_copy_if{ public: usrp_e100_mmap_zero_copy_impl(usrp_e100_iface::sptr iface): _fd(iface->get_file_descriptor()), _recv_index(0), _send_index(0) @@ -125,8 +124,8 @@ public: //return the managed buffer for this frame if (fp_verbose) std::cout << " make_recv_buff: " << info->len << std::endl; return managed_recv_buffer::make_safe( - boost::asio::const_buffer(mem, info->len), - boost::bind(&usrp_e100_mmap_zero_copy_impl::release, shared_from_this(), info) + mem, info->len, + boost::bind(&usrp_e100_mmap_zero_copy_impl::release, this, info) ); } @@ -161,8 +160,8 @@ public: //return the managed buffer for this frame if (fp_verbose) std::cout << " make_send_buff: " << _frame_size << std::endl; return managed_send_buffer::make_safe( - boost::asio::mutable_buffer(mem, _frame_size), - boost::bind(&usrp_e100_mmap_zero_copy_impl::commit, shared_from_this(), info, _1) + mem, _frame_size, + boost::bind(&usrp_e100_mmap_zero_copy_impl::commit, this, info, _1) ); } |