From 1ef40895952f94ccd21fca48033b5a14d7e4ff30 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sat, 2 Jul 2011 19:35:33 -0700 Subject: usrp1: tweaks + implemented other features to mimic async and inline messages Moved the underflow/overflow polling into a thread and out of the fast-path. Added an inline and async message queue into soft time control. Error and status messages are actually generated now and enqueued. Passes the async message test... --- host/lib/usrp/usrp1/io_impl.cpp | 155 +++++++++++++++++++++++---------- host/lib/usrp/usrp1/soft_time_ctrl.cpp | 48 ++++++++-- host/lib/usrp/usrp1/soft_time_ctrl.hpp | 9 +- host/lib/usrp/usrp1/usrp1_impl.cpp | 4 +- host/lib/usrp/usrp1/usrp1_impl.hpp | 4 + 5 files changed, 159 insertions(+), 61 deletions(-) (limited to 'host/lib/usrp/usrp1') diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index 8e75f2025..461529ae2 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -123,8 +124,6 @@ static void usrp1_bs_vrt_unpacker( struct usrp1_impl::io_impl{ io_impl(zero_copy_if::sptr data_transport): data_transport(data_transport), - underflow_poll_samp_count(0), - overflow_poll_samp_count(0), curr_buff(offset_send_buffer(data_transport->get_send_buff())), omsb(boost::bind(&usrp1_impl::io_impl::commit_send_buff, this, _1, _2, _3)) { @@ -132,6 +131,8 @@ struct usrp1_impl::io_impl{ } ~io_impl(void){ + vandal_tribe.interrupt_all(); + vandal_tribe.join_all(); UHD_SAFE_CALL(flush_send_buff();) } @@ -141,12 +142,6 @@ struct usrp1_impl::io_impl{ sph::recv_packet_handler recv_handler; sph::send_packet_handler send_handler; - //state management for overflow and underflow - size_t underflow_poll_samp_count; - size_t overflow_poll_samp_count; - size_t rx_samps_per_poll_interval; - size_t tx_samps_per_poll_interval; - //wrapper around the actual send buffer interface //all of this to ensure only aligned lengths are committed //NOTE: you must commit before getting a new buffer @@ -163,6 +158,9 @@ struct usrp1_impl::io_impl{ //make a new managed buffer with the offset buffs return omsb.get_new(curr_buff, next_buff); } + + boost::thread_group vandal_tribe; + boost::system_time last_send_time; }; /*! @@ -231,6 +229,14 @@ void usrp1_impl::io_init(void){ _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport)); + //create a new vandal thread to poll xerflow conditions + boost::barrier spawn_barrier(2); + _io_impl->vandal_tribe.create_thread(boost::bind( + &usrp1_impl::vandal_conquest_loop, + this, boost::ref(spawn_barrier) + )); + spawn_barrier.wait(); + //init some handler stuff _io_impl->recv_handler.set_tick_rate(_master_clock_rate); _io_impl->recv_handler.set_vrt_unpacker(&usrp1_bs_vrt_unpacker); @@ -243,23 +249,91 @@ void usrp1_impl::io_init(void){ &usrp1_impl::io_impl::get_send_buff, _io_impl.get(), _1 )); - this->enable_tx(true); //always enabled + //init as disabled, then call the real function (uses restore) + this->enable_rx(false); + this->enable_tx(false); rx_stream_on_off(false); + tx_stream_on_off(false); _io_impl->flush_send_buff(); } void usrp1_impl::rx_stream_on_off(bool enb){ - this->enable_rx(enb); + this->restore_rx(enb); //drain any junk in the receive transport after stop streaming command while(not enb and _data_transport->get_recv_buff().get() != NULL){ /* NOP */ } } +void usrp1_impl::tx_stream_on_off(bool enb){ + _io_impl->last_send_time = boost::get_system_time(); + if (_tx_enabled and not enb) _io_impl->flush_send_buff(); + this->restore_tx(enb); +} + +/*! + * Casually poll the overflow and underflow registers. + * On an underflow, push an async message into the queue and print. + * On an overflow, interleave an inline message into recv and print. + * This procedure creates "soft" inline and async user messages. + */ +void usrp1_impl::vandal_conquest_loop(boost::barrier &spawn_barrier){ + spawn_barrier.wait(); + + //initialize the async metadata + async_metadata_t async_metadata; + async_metadata.channel = 0; + async_metadata.has_time_spec = true; + async_metadata.event_code = async_metadata_t::EVENT_CODE_UNDERFLOW; + + //initialize the inline metadata + rx_metadata_t inline_metadata; + inline_metadata.has_time_spec = true; + inline_metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + + //start the polling loop... + try{ while (not boost::this_thread::interruption_requested()){ + boost::uint8_t underflow = 0, overflow = 0; + + //shutoff transmit if it has been too long since send() was called + if (_tx_enabled and (boost::get_system_time() - _io_impl->last_send_time) > boost::posix_time::milliseconds(100)){ + this->tx_stream_on_off(false); + } + + //always poll regardless of enabled so we can clear the conditions + _fx2_ctrl->usrp_control_read( + VRQ_GET_STATUS, 0, GS_TX_UNDERRUN, &underflow, sizeof(underflow) + ); + _fx2_ctrl->usrp_control_read( + VRQ_GET_STATUS, 0, GS_RX_OVERRUN, &overflow, sizeof(overflow) + ); + + //handle message generation for xerflow conditions + if (_tx_enabled and underflow){ + async_metadata.time_spec = _soft_time_ctrl->get_time(); + _soft_time_ctrl->get_async_queue().push_with_pop_on_full(async_metadata); + UHD_MSG(fastpath) << "U"; + } + if (_rx_enabled and overflow){ + inline_metadata.time_spec = _soft_time_ctrl->get_time(); + _soft_time_ctrl->get_inline_queue().push_with_pop_on_full(inline_metadata); + UHD_MSG(fastpath) << "O"; + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(50)); + }} + catch(const boost::thread_interrupted &){} //normal exit condition + catch(const std::exception &e){ + UHD_MSG(error) << "The vandal caught an unexpected exception " << e.what() << std::endl; + } +} + /*********************************************************************** * Properties callback methods below **********************************************************************/ void usrp1_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){ + boost::mutex::scoped_lock lock = _io_impl->recv_handler.get_scoped_lock(); + //sanity checking validate_subdev_spec(_tree, spec, "rx"); @@ -281,6 +355,8 @@ void usrp1_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){ } void usrp1_impl::update_tx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){ + boost::mutex::scoped_lock lock = _io_impl->send_handler.get_scoped_lock(); + //sanity checking validate_subdev_spec(_tree, spec, "tx"); @@ -305,14 +381,13 @@ void usrp1_impl::update_tx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){ } double usrp1_impl::update_rx_samp_rate(const double samp_rate){ + boost::mutex::scoped_lock lock = _io_impl->recv_handler.get_scoped_lock(); + size_t rate = boost::math::iround(_master_clock_rate / samp_rate); //clip the rate to something in range: rate = std::min(std::max(rate, 4), 256); - //TODO Poll every 100ms. Make it selectable? - _io_impl->rx_samps_per_poll_interval = size_t(0.1 * _master_clock_rate / rate); - bool s = this->disable_rx(); _iface->poke32(FR_DECIM_RATE, rate/2 - 1); this->restore_rx(s); @@ -322,14 +397,13 @@ double usrp1_impl::update_rx_samp_rate(const double samp_rate){ } double usrp1_impl::update_tx_samp_rate(const double samp_rate){ + boost::mutex::scoped_lock lock = _io_impl->send_handler.get_scoped_lock(); + size_t rate = boost::math::iround(_master_clock_rate / samp_rate); //clip the rate to something in range: rate = std::min(std::max(rate, 4), 256); - //TODO Poll every 100ms. Make it selectable? - _io_impl->tx_samps_per_poll_interval = size_t(0.1 * _master_clock_rate / rate); - bool s = this->disable_tx(); _iface->poke32(FR_INTERP_RATE, rate/2 - 1); this->restore_tx(s); @@ -367,10 +441,11 @@ double usrp1_impl::update_tx_dsp_freq(const size_t dspno, const double freq){ /*********************************************************************** * Async Data **********************************************************************/ -bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, double timeout){ - //dummy fill-in for the recv_async_msg - boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6))); - return false; +bool usrp1_impl::recv_async_msg( + async_metadata_t &async_metadata, double timeout +){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + return _soft_time_ctrl->get_async_queue().pop_with_timed_wait(async_metadata, timeout); } /*********************************************************************** @@ -390,29 +465,23 @@ size_t usrp1_impl::send( ){ if (_soft_time_ctrl->send_pre(metadata, timeout)) return 0; + this->tx_stream_on_off(true); //always enable (it will do the right thing) size_t num_samps_sent = _io_impl->send_handler.send( buffs, nsamps_per_buff, metadata, io_type, send_mode, timeout ); - //handle eob flag (commit the buffer, disable the DACs) + //handle eob flag (commit the buffer, /*disable the DACs*/) //check num samps sent to avoid flush on incomplete/timeout if (metadata.end_of_burst and num_samps_sent == nsamps_per_buff){ - _io_impl->flush_send_buff(); - } - - //handle the polling for underflow conditions - _io_impl->underflow_poll_samp_count += num_samps_sent; - if (_io_impl->underflow_poll_samp_count >= _io_impl->tx_samps_per_poll_interval){ - _io_impl->underflow_poll_samp_count = 0; //reset count - boost::uint8_t underflow = 0; - int ret = _fx2_ctrl->usrp_control_read( - VRQ_GET_STATUS, 0, GS_TX_UNDERRUN, - &underflow, sizeof(underflow) - ); - if (ret < 0) UHD_MSG(error) << "USRP: underflow check failed" << std::endl; - else if (underflow) UHD_MSG(fastpath) << "U"; + async_metadata_t metadata; + metadata.channel = 0; + metadata.has_time_spec = true; + metadata.time_spec = _soft_time_ctrl->get_time(); + metadata.event_code = async_metadata_t::EVENT_CODE_BURST_ACK; + _soft_time_ctrl->get_async_queue().push_with_pop_on_full(metadata); + this->tx_stream_on_off(false); } return num_samps_sent; @@ -433,6 +502,9 @@ size_t usrp1_impl::recv( rx_metadata_t &metadata, const io_type_t &io_type, recv_mode_t recv_mode, double timeout ){ + //interleave a "soft" inline message into the receive stream: + if (_soft_time_ctrl->get_inline_queue().pop_with_haste(metadata)) return 0; + size_t num_samps_recvd = _io_impl->recv_handler.recv( buffs, nsamps_per_buff, metadata, io_type, @@ -441,18 +513,5 @@ size_t usrp1_impl::recv( _soft_time_ctrl->recv_post(metadata, num_samps_recvd); - //handle the polling for overflow conditions - _io_impl->overflow_poll_samp_count += num_samps_recvd; - if (_io_impl->overflow_poll_samp_count >= _io_impl->rx_samps_per_poll_interval){ - _io_impl->overflow_poll_samp_count = 0; //reset count - boost::uint8_t overflow = 0; - int ret = _fx2_ctrl->usrp_control_read( - VRQ_GET_STATUS, 0, GS_RX_OVERRUN, - &overflow, sizeof(overflow) - ); - if (ret < 0) UHD_MSG(error) << "USRP: overflow check failed" << std::endl; - else if (overflow) UHD_MSG(fastpath) << "O"; - } - return num_samps_recvd; } diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index 1bab34e7b..cf602185d 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -16,8 +16,7 @@ // #include "soft_time_ctrl.hpp" -#include -#include +#include #include #include #include @@ -41,6 +40,8 @@ public: _nsamps_remaining(0), _stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS), _cmd_queue(2), + _async_msg_queue(100), + _inline_msg_queue(100), _stream_on_off(stream_on_off) { //synchronously spawn a new thread @@ -102,11 +103,20 @@ public: //When to stop streaming: //The samples have been received and the stream mode is non-continuous. //Rewrite the sample count to clip to the requested number of samples. - if (_nsamps_remaining <= nsamps){ + if (_nsamps_remaining <= nsamps) switch(_stream_mode){ + case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE:{ + rx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.error_code = rx_metadata_t::ERROR_CODE_BROKEN_CHAIN; + _inline_msg_queue.push_with_pop_on_full(metadata); + } //continue to next case... + case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE: nsamps = _nsamps_remaining; //set nsamps, then stop md.end_of_burst = true; stream_on_off(false); return; + default: break; } //update the consumed samples @@ -114,7 +124,7 @@ public: } void issue_stream_cmd(const stream_cmd_t &cmd){ - _cmd_queue.push_with_wait(cmd); + _cmd_queue.push_with_wait(boost::make_shared(cmd)); } void stream_on_off(bool enb){ @@ -134,7 +144,12 @@ public: //handle late packets if (time_at < time_now()){ - //TODO post async message + async_metadata_t metadata; + metadata.channel = 0; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.event_code = async_metadata_t::EVENT_CODE_TIME_ERROR; + _async_msg_queue.push_with_pop_on_full(metadata); return true; } @@ -153,7 +168,12 @@ public: if (not cmd.stream_now){ time_spec_t time_at(cmd.time_spec - TWIDDLE); if (time_at < time_now()){ - //TODO inject late cmd inline error + rx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.error_code = rx_metadata_t::ERROR_CODE_LATE_COMMAND; + _inline_msg_queue.push_with_pop_on_full(metadata); + _stream_mode = stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS; } else{ sleep_until_time(lock, time_at); @@ -180,20 +200,30 @@ public: void recv_cmd_dispatcher(boost::barrier &spawn_barrier){ spawn_barrier.wait(); try{ - boost::any cmd; + boost::shared_ptr cmd; while (true){ _cmd_queue.pop_with_wait(cmd); - recv_cmd_handle_cmd(boost::any_cast(cmd)); + recv_cmd_handle_cmd(*cmd); } } catch(const boost::thread_interrupted &){} } + bounded_buffer &get_async_queue(void){ + return _async_msg_queue; + } + + bounded_buffer &get_inline_queue(void){ + return _inline_msg_queue; + } + private: boost::mutex _update_mutex; size_t _nsamps_remaining; stream_cmd_t::stream_mode_t _stream_mode; time_spec_t _time_offset; - bounded_buffer _cmd_queue; + bounded_buffer > _cmd_queue; + bounded_buffer _async_msg_queue; + bounded_buffer _inline_msg_queue; const cb_fcn_type _stream_on_off; boost::thread_group _thread_group; }; diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.hpp b/host/lib/usrp/usrp1/soft_time_ctrl.hpp index 7fdac7fc8..b2bf6c6f9 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.hpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -45,8 +46,6 @@ public: * \return a new soft time control object */ static sptr make(const cb_fcn_type &stream_on_off); - //TODO pass in the error queue for async msgs - //TODO pass in the queue for inline msgs //! Set the current time virtual void set_time(const time_spec_t &time) = 0; @@ -62,6 +61,12 @@ public: //! Issue a stream command to receive virtual void issue_stream_cmd(const stream_cmd_t &cmd) = 0; + + //! Get access to a buffer of async metadata + virtual transport::bounded_buffer &get_async_queue(void) = 0; + + //! Get access to a buffer of inline metadata + virtual transport::bounded_buffer &get_inline_queue(void) = 0; }; }} //namespace diff --git a/host/lib/usrp/usrp1/usrp1_impl.cpp b/host/lib/usrp/usrp1/usrp1_impl.cpp index deb257a83..78137178b 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.cpp +++ b/host/lib/usrp/usrp1/usrp1_impl.cpp @@ -294,8 +294,8 @@ usrp1_impl::usrp1_impl(const device_addr_t &device_addr){ .coerce(boost::bind(&usrp1_impl::update_tx_samp_rate, this, _1)); _tree->create(tx_dsp_path / "freq/value") .coerce(boost::bind(&usrp1_impl::update_tx_dsp_freq, this, dspno, _1)); - _tree->create(tx_dsp_path / "freq/range") - .set(meta_range_t(-_master_clock_rate/2, +_master_clock_rate/2)); + _tree->create(tx_dsp_path / "freq/range") //magic scalar comes from codec control: + .set(meta_range_t(-_master_clock_rate*0.6875, +_master_clock_rate*0.6875)); } //////////////////////////////////////////////////////////////////// diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index f156d0bc4..cb1497253 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #ifndef INCLUDED_USRP1_IMPL_HPP @@ -116,6 +117,7 @@ private: UHD_PIMPL_DECL(io_impl) _io_impl; void io_init(void); void rx_stream_on_off(bool); + void tx_stream_on_off(bool); void handle_overrun(size_t); //otw types @@ -128,6 +130,8 @@ private: bool has_rx_halfband(void); bool has_tx_halfband(void); + void vandal_conquest_loop(boost::barrier &); + //handle the enables bool _rx_enabled, _tx_enabled; void enable_rx(bool enb){ -- cgit v1.2.3