diff options
author | Josh Blum <josh@joshknows.com> | 2011-06-15 14:50:02 -0700 |
---|---|---|
committer | Josh Blum <josh@joshknows.com> | 2011-06-15 14:50:02 -0700 |
commit | 0cef788d3d34a298c975e32c488e800a5c65ccce (patch) | |
tree | 14b7f4242a28927b1701a73a3e8ec0df3fa04811 /host/lib/usrp/usrp2/io_impl.cpp | |
parent | 7951170c6161d9266726ec19e8c009500cf11f75 (diff) | |
parent | 27f1622d439ceb787e7dada733d0eb82270c5532 (diff) | |
download | uhd-0cef788d3d34a298c975e32c488e800a5c65ccce.tar.gz uhd-0cef788d3d34a298c975e32c488e800a5c65ccce.tar.bz2 uhd-0cef788d3d34a298c975e32c488e800a5c65ccce.zip |
Merge branch 'next'
Diffstat (limited to 'host/lib/usrp/usrp2/io_impl.cpp')
-rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 286 |
1 files changed, 75 insertions, 211 deletions
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 33f249599..ffe9a88e7 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -15,18 +15,21 @@ // along with this program. If not, see <http://www.gnu.org/licenses/>. // -#include "../../transport/vrt_packet_handler.hpp" +#include "../../transport/super_recv_packet_handler.hpp" +#include "../../transport/super_send_packet_handler.hpp" #include "usrp2_impl.hpp" #include "usrp2_regs.hpp" #include <uhd/utils/log.hpp> #include <uhd/utils/msg.hpp> #include <uhd/exception.hpp> #include <uhd/usrp/mboard_props.hpp> +#include <uhd/usrp/dsp_props.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/utils/thread_priority.hpp> #include <uhd/transport/bounded_buffer.hpp> #include <boost/format.hpp> #include <boost/bind.hpp> +#include <boost/thread/mutex.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/barrier.hpp> #include <iostream> @@ -80,14 +83,21 @@ public: } /*! + * Gets the current sequence number to go out. + * Increments the sequence for the next call + * \return the sequence to be sent to the dsp + */ + UHD_INLINE seq_type get_curr_seq_out(void){ + return _last_seq_out++; + } + + /*! * Check the flow control condition. - * \param seq the sequence to go out * \param timeout the timeout in seconds * \return false on timeout */ - UHD_INLINE bool check_fc_condition(seq_type seq, double timeout){ - boost::unique_lock<boost::mutex> lock(_fc_mutex); - _last_seq_out = seq; + UHD_INLINE bool check_fc_condition(double timeout){ + boost::mutex::scoped_lock lock(_fc_mutex); if (this->ready()) return true; boost::this_thread::disable_interruption di; //disable because the wait can throw return _fc_cond.timed_wait(lock, to_time_dur(timeout), _ready_fcn); @@ -98,7 +108,7 @@ public: * \param seq the last sequence number to be ACK'd */ UHD_INLINE void update_fc_condition(seq_type seq){ - boost::unique_lock<boost::mutex> lock(_fc_mutex); + boost::mutex::scoped_lock lock(_fc_mutex); _last_seq_ack = seq; lock.unlock(); _fc_cond.notify_one(); @@ -116,23 +126,6 @@ private: }; /*********************************************************************** - * Alignment indexes class: keeps track of indexes - **********************************************************************/ -class alignment_indexes{ -public: - alignment_indexes(void){_indexes = 0;} - void reset(size_t len){_indexes = (1 << len) - 1;} - size_t front(void){ //TODO replace with look-up table - size_t index = 0; - while ((_indexes & (1 << index)) == 0) index++; - return index; - } - void remove(size_t index){_indexes &= ~(1 << index);} - bool empty(void){return _indexes == 0;} -private: size_t _indexes; -}; - -/*********************************************************************** * io impl details (internal to this file) * - pirate crew * - alignment buffer @@ -143,24 +136,13 @@ struct usrp2_impl::io_impl{ io_impl(std::vector<zero_copy_if::sptr> &dsp_xports): dsp_xports(dsp_xports), //the assumption is that all data transports should be identical - get_recv_buffs_fcn(boost::bind(&usrp2_impl::io_impl::get_recv_buffs, this, _1)), - get_send_buffs_fcn(boost::bind(&usrp2_impl::io_impl::get_send_buffs, this, _1)), async_msg_fifo(100/*messages deep*/) { for (size_t i = 0; i < dsp_xports.size(); i++){ fc_mons.push_back(flow_control_monitor::sptr(new flow_control_monitor( usrp2_impl::sram_bytes/dsp_xports.front()->get_send_frame_size() - )));; + ))); } - - //init empty packet infos - vrt::if_packet_info_t packet_info = vrt::if_packet_info_t(); - packet_info.packet_count = 0xf; - packet_info.has_tsi = true; - packet_info.tsi = 0; - packet_info.has_tsf = true; - packet_info.tsf = 0; - prev_infos.resize(dsp_xports.size(), packet_info); } ~io_impl(void){ @@ -169,38 +151,27 @@ struct usrp2_impl::io_impl{ recv_pirate_crew.join_all(); } - bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &buffs){ - UHD_ASSERT_THROW(send_map.size() == buffs.size()); + managed_send_buffer::sptr get_send_buff(size_t chan, double timeout){ + const size_t index = send_map[chan]; + flow_control_monitor &fc_mon = *fc_mons[index]; - //calculate the flow control word - const boost::uint32_t fc_word32 = packet_handler_send_state.next_packet_seq; + //wait on flow control w/ timeout + if (not fc_mon.check_fc_condition(timeout)) return managed_send_buffer::sptr(); - //grab a managed buffer for each index - for (size_t i = 0; i < buffs.size(); i++){ - if (not fc_mons[send_map[i]]->check_fc_condition(fc_word32, send_timeout)) return false; - buffs[i] = dsp_xports[send_map[i]]->get_send_buff(send_timeout); - if (not buffs[i].get()) return false; - buffs[i]->cast<boost::uint32_t *>()[0] = uhd::htonx(fc_word32); - } - return true; - } + //get a buffer from the transport w/ timeout + managed_send_buffer::sptr buff = dsp_xports[index]->get_send_buff(timeout); + + //write the flow control word into the buffer + if (buff.get()) buff->cast<boost::uint32_t *>()[0] = uhd::htonx(fc_mon.get_curr_seq_out()); - alignment_indexes indexes_to_do; //used in alignment logic - time_spec_t expected_time; //used in alignment logic - bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs); + return buff; + } std::vector<zero_copy_if::sptr> &dsp_xports; //mappings from channel index to dsp xport std::vector<size_t> send_map, recv_map; - //timeouts set on calls to recv/send (passed into get buffs methods) - double recv_timeout, send_timeout; - - //bound callbacks for get buffs (bound once here, not in fast-path) - vrt_packet_handler::get_recv_buffs_t get_recv_buffs_fcn; - vrt_packet_handler::get_send_buffs_t get_send_buffs_fcn; - //previous state for each buffer std::vector<vrt::if_packet_info_t> prev_infos; @@ -208,8 +179,8 @@ struct usrp2_impl::io_impl{ std::vector<flow_control_monitor::sptr> fc_mons; //state management for the vrt packet handler code - vrt_packet_handler::recv_state packet_handler_recv_state; - vrt_packet_handler::send_state packet_handler_send_state; + sph::recv_packet_handler recv_handler; + sph::send_packet_handler send_handler; //methods and variables for the pirate crew void recv_pirate_loop(boost::barrier &, usrp2_mboard_impl::sptr, zero_copy_if::sptr, size_t); @@ -258,7 +229,7 @@ void usrp2_impl::io_impl::recv_pirate_loop( metadata.time_spec = time_spec_t( time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq() ); - metadata.event_code = vrt_packet_handler::get_context_code<async_metadata_t::event_code_t>(vrt_hdr, if_packet_info); + metadata.event_code = async_metadata_t::event_code_t(sph::get_context_code(vrt_hdr, if_packet_info)); //catch the flow control packets and react if (metadata.event_code == 0){ @@ -327,8 +298,39 @@ void usrp2_impl::update_xport_channel_mapping(void){ } - _io_impl->packet_handler_recv_state = vrt_packet_handler::recv_state(_io_impl->recv_map.size()); - _io_impl->packet_handler_send_state = vrt_packet_handler::send_state(_io_impl->send_map.size()); + //set all of the relevant properties on the handler + boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock(); + _io_impl->recv_handler.resize(_io_impl->recv_map.size()); + _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); + _io_impl->recv_handler.set_tick_rate(_mboards.front()->get_master_clock_freq()); + //TODO temporarily use the first dsp rate until we support non-homo rates + const std::string rx_dsp_name = _mboards.at(0)->get_link()[MBOARD_PROP_RX_DSP_NAMES].as<prop_names_t>().at(0); + const double rx_host_rate = _mboards.at(0)->get_link()[named_prop_t(MBOARD_PROP_RX_DSP, rx_dsp_name)][DSP_PROP_HOST_RATE].as<double>(); + _io_impl->recv_handler.set_samp_rate(rx_host_rate); + for (size_t chan = 0; chan < _io_impl->recv_handler.size(); chan++){ + _io_impl->recv_handler.set_xport_chan_get_buff(chan, boost::bind( + &uhd::transport::zero_copy_if::get_recv_buff, + _io_impl->dsp_xports[_io_impl->recv_map[chan]], _1 + )); + } + _io_impl->recv_handler.set_converter(_rx_otw_type); + + //set all of the relevant properties on the handler + boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock(); + _io_impl->send_handler.resize(_io_impl->send_map.size()); + _io_impl->send_handler.set_vrt_packer(&vrt::if_hdr_pack_be, vrt_send_header_offset_words32); + _io_impl->send_handler.set_tick_rate(_mboards.front()->get_master_clock_freq()); + //TODO temporarily use the first dsp rate until we support non-homo rates + const std::string tx_dsp_name = _mboards.at(0)->get_link()[MBOARD_PROP_TX_DSP_NAMES].as<prop_names_t>().at(0); + const double tx_host_rate = _mboards.at(0)->get_link()[named_prop_t(MBOARD_PROP_TX_DSP, tx_dsp_name)][DSP_PROP_HOST_RATE].as<double>(); + _io_impl->send_handler.set_samp_rate(tx_host_rate); + for (size_t chan = 0; chan < _io_impl->send_handler.size(); chan++){ + _io_impl->send_handler.set_xport_chan_get_buff(chan, boost::bind( + &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), chan, _1 + )); + } + _io_impl->send_handler.set_converter(_tx_otw_type); + _io_impl->send_handler.set_max_samples_per_packet(get_max_send_samps_per_packet()); } /*********************************************************************** @@ -355,143 +357,17 @@ size_t usrp2_impl::get_max_send_samps_per_packet(void) const{ } size_t usrp2_impl::send( - const send_buffs_type &buffs, size_t num_samps, + const send_buffs_type &buffs, size_t nsamps_per_buff, const tx_metadata_t &metadata, const io_type_t &io_type, send_mode_t send_mode, double timeout ){ - _io_impl->send_timeout = timeout; - return vrt_packet_handler::send( - _io_impl->packet_handler_send_state, //last state of the send handler - buffs, num_samps, //buffer to fill - metadata, send_mode, //samples metadata - io_type, _tx_otw_type, //input and output types to convert - _mboards.front()->get_master_clock_freq(), //master clock tick rate - uhd::transport::vrt::if_hdr_pack_be, - _io_impl->get_send_buffs_fcn, - get_max_send_samps_per_packet(), - vrt_send_header_offset_words32 - ); -} - -/*********************************************************************** - * Alignment logic on receive - **********************************************************************/ -static UHD_INLINE time_spec_t extract_time_spec( - const vrt::if_packet_info_t &packet_info -){ - return time_spec_t( //assumes has_tsi and has_tsf are true - time_t(packet_info.tsi), size_t(packet_info.tsf), - 100e6 //tick rate does not have to be correct for comparison purposes + return _io_impl->send_handler.send( + buffs, nsamps_per_buff, + metadata, io_type, + send_mode, timeout ); } -static UHD_INLINE void extract_packet_info( - managed_recv_buffer::sptr &buff, - vrt::if_packet_info_t &prev_info, - time_spec_t &time, bool &clear, bool &msg -){ - //extract packet info - vrt::if_packet_info_t next_info; - next_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t); - vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), next_info); - - //handle the packet count / sequence number - if ((prev_info.packet_count+1)%16 != next_info.packet_count){ - UHD_MSG(fastpath) << "O"; //report overflow (drops in the kernel) - } - - time = extract_time_spec(next_info); - clear = extract_time_spec(prev_info) > time; - msg = next_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA; - prev_info = next_info; -} - -static UHD_INLINE bool handle_msg_packet( - vrt_packet_handler::managed_recv_buffs_t &buffs, size_t index -){ - for (size_t i = 0; i < buffs.size(); i++){ - if (i == index) continue; - buffs[i].reset(); //set NULL - } - return true; -} - -UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs( - vrt_packet_handler::managed_recv_buffs_t &buffs -){ - if (buffs.size() == 1){ - buffs[0] = dsp_xports[recv_map[0]]->get_recv_buff(recv_timeout); - if (buffs[0].get() == NULL) return false; - bool clear, msg; time_spec_t time; //unused variables - //call extract_packet_info to handle printing the overflows - extract_packet_info(buffs[0], this->prev_infos[recv_map[0]], time, clear, msg); - return true; - } - //-------------------- begin alignment logic ---------------------// - UHD_ASSERT_THROW(recv_map.size() == buffs.size()); - boost::system_time exit_time = boost::get_system_time() + to_time_dur(recv_timeout); - managed_recv_buffer::sptr buff_tmp; - bool clear, msg; - size_t index; - - //If we did not enter this routine with an empty indexes set, - //jump to after the clear so we can preserve the previous state. - //This saves buffers from being lost when using non-blocking recv. - if (not indexes_to_do.empty()) goto skip_pop_initial; - - //respond to a clear by starting from scratch - got_clear: - indexes_to_do.reset(buffs.size()); - clear = false; - - //do an initial pop to load an initial sequence id - index = indexes_to_do.front(); - buff_tmp = dsp_xports[recv_map[index]]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); - if (buff_tmp.get() == NULL) return false; - extract_packet_info(buff_tmp, this->prev_infos[recv_map[index]], expected_time, clear, msg); - if (clear) goto got_clear; - buffs[index] = buff_tmp; - if (msg) return handle_msg_packet(buffs, index); - indexes_to_do.remove(index); - skip_pop_initial: - - //get an aligned set of elements from the buffers: - while(not indexes_to_do.empty()){ - - //pop an element off for this index - index = indexes_to_do.front(); - buff_tmp = dsp_xports[recv_map[index]]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); - if (buff_tmp.get() == NULL) return false; - time_spec_t this_time; - extract_packet_info(buff_tmp, this->prev_infos[recv_map[index]], this_time, clear, msg); - if (clear) goto got_clear; - buffs[index] = buff_tmp; - if (msg) return handle_msg_packet(buffs, index); - - //if the sequence id matches: - // remove this index from the list and continue - if (this_time == expected_time){ - indexes_to_do.remove(index); - } - - //if the sequence id is newer: - // use the new expected time for comparison - // add all other indexes back into the list - else if (this_time > expected_time){ - expected_time = this_time; - indexes_to_do.reset(buffs.size()); - indexes_to_do.remove(index); - } - - //if the sequence id is older: - // continue with the same index to try again - //else if (this_time < expected_time)... - - } - return true; - //-------------------- end alignment logic -----------------------// -} - /*********************************************************************** * Receive Data **********************************************************************/ @@ -505,26 +381,14 @@ size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{ return bpp/_rx_otw_type.get_sample_size(); } -void usrp2_impl::handle_overflow(size_t chan){ - UHD_MSG(fastpath) << "O"; - ldiv_t indexes = ldiv(chan, usrp2_mboard_impl::NUM_RX_DSPS); - _mboards.at(indexes.quot)->handle_overflow(indexes.rem); -} - size_t usrp2_impl::recv( - const recv_buffs_type &buffs, size_t num_samps, + const recv_buffs_type &buffs, size_t nsamps_per_buff, rx_metadata_t &metadata, const io_type_t &io_type, recv_mode_t recv_mode, double timeout ){ - _io_impl->recv_timeout = timeout; - return vrt_packet_handler::recv( - _io_impl->packet_handler_recv_state, //last state of the recv handler - buffs, num_samps, //buffer to fill - metadata, recv_mode, //samples metadata - io_type, _rx_otw_type, //input and output types to convert - _mboards.front()->get_master_clock_freq(), //master clock tick rate - uhd::transport::vrt::if_hdr_unpack_be, - _io_impl->get_recv_buffs_fcn, - boost::bind(&usrp2_impl::handle_overflow, this, _1) + return _io_impl->recv_handler.recv( + buffs, nsamps_per_buff, + metadata, io_type, + recv_mode, timeout ); } |