From 1ef40895952f94ccd21fca48033b5a14d7e4ff30 Mon Sep 17 00:00:00 2001
From: Josh Blum <josh@joshknows.com>
Date: Sat, 2 Jul 2011 19:35:33 -0700
Subject: usrp1: tweaks + implemented other features to mimic async and inline
 messages

Moved the underflow/overflow polling into a thread and out of the fast-path.
Added an inline and async message queue into soft time control.
Error and status messages are actually generated now and enqueued.
Passes the async message test...
---
 host/lib/usrp/e100/io_impl.cpp         |   5 ++
 host/lib/usrp/usrp1/io_impl.cpp        | 155 +++++++++++++++++++++++----------
 host/lib/usrp/usrp1/soft_time_ctrl.cpp |  48 ++++++++--
 host/lib/usrp/usrp1/soft_time_ctrl.hpp |   9 +-
 host/lib/usrp/usrp1/usrp1_impl.cpp     |   4 +-
 host/lib/usrp/usrp1/usrp1_impl.hpp     |   4 +
 6 files changed, 164 insertions(+), 61 deletions(-)

(limited to 'host/lib/usrp')

diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp
index 14b348f96..6e1bc3245 100644
--- a/host/lib/usrp/e100/io_impl.cpp
+++ b/host/lib/usrp/e100/io_impl.cpp
@@ -51,6 +51,11 @@ struct e100_impl::io_impl{
         false_alarm(0), async_msg_fifo(100/*messages deep*/)
     { /* NOP */ }
 
+    ~io_impl(void){
+        recv_pirate_crew.interrupt_all();
+        recv_pirate_crew.join_all();
+    }
+
     double tick_rate; //set by update tick rate method
     e100_ctrl::sptr iface; //so handle irq can peek and poke
     void handle_irq(void);
diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp
index 8e75f2025..461529ae2 100644
--- a/host/lib/usrp/usrp1/io_impl.cpp
+++ b/host/lib/usrp/usrp1/io_impl.cpp
@@ -28,6 +28,7 @@
 #include <uhd/transport/bounded_buffer.hpp>
 #include <boost/math/special_functions/sign.hpp>
 #include <boost/math/special_functions/round.hpp>
+#include <boost/thread/thread.hpp>
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
@@ -123,8 +124,6 @@ static void usrp1_bs_vrt_unpacker(
 struct usrp1_impl::io_impl{
     io_impl(zero_copy_if::sptr data_transport):
         data_transport(data_transport),
-        underflow_poll_samp_count(0),
-        overflow_poll_samp_count(0),
         curr_buff(offset_send_buffer(data_transport->get_send_buff())),
         omsb(boost::bind(&usrp1_impl::io_impl::commit_send_buff, this, _1, _2, _3))
     {
@@ -132,6 +131,8 @@ struct usrp1_impl::io_impl{
     }
 
     ~io_impl(void){
+        vandal_tribe.interrupt_all();
+        vandal_tribe.join_all();
         UHD_SAFE_CALL(flush_send_buff();)
     }
 
@@ -141,12 +142,6 @@ struct usrp1_impl::io_impl{
     sph::recv_packet_handler recv_handler;
     sph::send_packet_handler send_handler;
 
-    //state management for overflow and underflow
-    size_t underflow_poll_samp_count;
-    size_t overflow_poll_samp_count;
-    size_t rx_samps_per_poll_interval;
-    size_t tx_samps_per_poll_interval;
-
     //wrapper around the actual send buffer interface
     //all of this to ensure only aligned lengths are committed
     //NOTE: you must commit before getting a new buffer
@@ -163,6 +158,9 @@ struct usrp1_impl::io_impl{
         //make a new managed buffer with the offset buffs
         return omsb.get_new(curr_buff, next_buff);
     }
+
+    boost::thread_group vandal_tribe;
+    boost::system_time last_send_time;
 };
 
 /*!
@@ -231,6 +229,14 @@ void usrp1_impl::io_init(void){
 
     _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport));
 
+    //create a new vandal thread to poll xerflow conditions
+    boost::barrier spawn_barrier(2);
+    _io_impl->vandal_tribe.create_thread(boost::bind(
+        &usrp1_impl::vandal_conquest_loop,
+        this, boost::ref(spawn_barrier)
+    ));
+    spawn_barrier.wait();
+
     //init some handler stuff
     _io_impl->recv_handler.set_tick_rate(_master_clock_rate);
     _io_impl->recv_handler.set_vrt_unpacker(&usrp1_bs_vrt_unpacker);
@@ -243,23 +249,91 @@ void usrp1_impl::io_init(void){
         &usrp1_impl::io_impl::get_send_buff, _io_impl.get(), _1
     ));
 
-    this->enable_tx(true); //always enabled
+    //init as disabled, then call the real function (uses restore)
+    this->enable_rx(false);
+    this->enable_tx(false);
     rx_stream_on_off(false);
+    tx_stream_on_off(false);
     _io_impl->flush_send_buff();
 }
 
 void usrp1_impl::rx_stream_on_off(bool enb){
-    this->enable_rx(enb);
+    this->restore_rx(enb);
     //drain any junk in the receive transport after stop streaming command
     while(not enb and _data_transport->get_recv_buff().get() != NULL){
         /* NOP */
     }
 }
 
+void usrp1_impl::tx_stream_on_off(bool enb){
+    _io_impl->last_send_time = boost::get_system_time();
+    if (_tx_enabled and not enb) _io_impl->flush_send_buff();
+    this->restore_tx(enb);
+}
+
+/*!
+ * Casually poll the overflow and underflow registers.
+ * On an underflow, push an async message into the queue and print.
+ * On an overflow, interleave an inline message into recv and print.
+ * This procedure creates "soft" inline and async user messages.
+ */
+void usrp1_impl::vandal_conquest_loop(boost::barrier &spawn_barrier){
+    spawn_barrier.wait();
+
+    //initialize the async metadata
+    async_metadata_t async_metadata;
+    async_metadata.channel = 0;
+    async_metadata.has_time_spec = true;
+    async_metadata.event_code = async_metadata_t::EVENT_CODE_UNDERFLOW;
+
+    //initialize the inline metadata
+    rx_metadata_t inline_metadata;
+    inline_metadata.has_time_spec = true;
+    inline_metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+
+    //start the polling loop...
+    try{ while (not boost::this_thread::interruption_requested()){
+        boost::uint8_t underflow = 0, overflow = 0;
+
+        //shutoff transmit if it has been too long since send() was called
+        if (_tx_enabled and (boost::get_system_time() - _io_impl->last_send_time) > boost::posix_time::milliseconds(100)){
+            this->tx_stream_on_off(false);
+        }
+
+        //always poll regardless of enabled so we can clear the conditions
+        _fx2_ctrl->usrp_control_read(
+            VRQ_GET_STATUS, 0, GS_TX_UNDERRUN, &underflow, sizeof(underflow)
+        );
+        _fx2_ctrl->usrp_control_read(
+            VRQ_GET_STATUS, 0, GS_RX_OVERRUN, &overflow, sizeof(overflow)
+        );
+
+        //handle message generation for xerflow conditions
+        if (_tx_enabled and underflow){
+            async_metadata.time_spec = _soft_time_ctrl->get_time();
+            _soft_time_ctrl->get_async_queue().push_with_pop_on_full(async_metadata);
+            UHD_MSG(fastpath) << "U";
+        }
+        if (_rx_enabled and overflow){
+            inline_metadata.time_spec = _soft_time_ctrl->get_time();
+            _soft_time_ctrl->get_inline_queue().push_with_pop_on_full(inline_metadata);
+            UHD_MSG(fastpath) << "O";
+        }
+
+        boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    }}
+    catch(const boost::thread_interrupted &){} //normal exit condition
+    catch(const std::exception &e){
+        UHD_MSG(error) << "The vandal caught an unexpected exception " << e.what() << std::endl;
+    }
+}
+
 /***********************************************************************
  * Properties callback methods below
  **********************************************************************/
 void usrp1_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){
+    boost::mutex::scoped_lock lock = _io_impl->recv_handler.get_scoped_lock();
+
     //sanity checking
     validate_subdev_spec(_tree, spec, "rx");
 
@@ -281,6 +355,8 @@ void usrp1_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){
 }
 
 void usrp1_impl::update_tx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){
+    boost::mutex::scoped_lock lock = _io_impl->send_handler.get_scoped_lock();
+
     //sanity checking
     validate_subdev_spec(_tree, spec, "tx");
 
@@ -305,14 +381,13 @@ void usrp1_impl::update_tx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){
 }
 
 double usrp1_impl::update_rx_samp_rate(const double samp_rate){
+    boost::mutex::scoped_lock lock = _io_impl->recv_handler.get_scoped_lock();
+
     size_t rate = boost::math::iround(_master_clock_rate / samp_rate);
 
     //clip the rate to something in range:
     rate = std::min<size_t>(std::max<size_t>(rate, 4), 256);
 
-    //TODO Poll every 100ms. Make it selectable?
-    _io_impl->rx_samps_per_poll_interval = size_t(0.1 * _master_clock_rate / rate);
-
     bool s = this->disable_rx();
     _iface->poke32(FR_DECIM_RATE, rate/2 - 1);
     this->restore_rx(s);
@@ -322,14 +397,13 @@ double usrp1_impl::update_rx_samp_rate(const double samp_rate){
 }
 
 double usrp1_impl::update_tx_samp_rate(const double samp_rate){
+    boost::mutex::scoped_lock lock = _io_impl->send_handler.get_scoped_lock();
+
     size_t rate = boost::math::iround(_master_clock_rate / samp_rate);
 
     //clip the rate to something in range:
     rate = std::min<size_t>(std::max<size_t>(rate, 4), 256);
 
-    //TODO Poll every 100ms. Make it selectable? 
-    _io_impl->tx_samps_per_poll_interval = size_t(0.1 * _master_clock_rate / rate);
-
     bool s = this->disable_tx();
     _iface->poke32(FR_INTERP_RATE, rate/2 - 1);
     this->restore_tx(s);
@@ -367,10 +441,11 @@ double usrp1_impl::update_tx_dsp_freq(const size_t dspno, const double freq){
 /***********************************************************************
  * Async Data
  **********************************************************************/
-bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, double timeout){
-    //dummy fill-in for the recv_async_msg
-    boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6)));
-    return false;
+bool usrp1_impl::recv_async_msg(
+    async_metadata_t &async_metadata, double timeout
+){
+    boost::this_thread::disable_interruption di; //disable because the wait can throw
+    return _soft_time_ctrl->get_async_queue().pop_with_timed_wait(async_metadata, timeout);
 }
 
 /***********************************************************************
@@ -390,29 +465,23 @@ size_t usrp1_impl::send(
 ){
     if (_soft_time_ctrl->send_pre(metadata, timeout)) return 0;
 
+    this->tx_stream_on_off(true); //always enable (it will do the right thing)
     size_t num_samps_sent = _io_impl->send_handler.send(
         buffs, nsamps_per_buff,
         metadata, io_type,
         send_mode, timeout
     );
 
-    //handle eob flag (commit the buffer, disable the DACs)
+    //handle eob flag (commit the buffer, /*disable the DACs*/)
     //check num samps sent to avoid flush on incomplete/timeout
     if (metadata.end_of_burst and num_samps_sent == nsamps_per_buff){
-        _io_impl->flush_send_buff();
-    }
-
-    //handle the polling for underflow conditions
-    _io_impl->underflow_poll_samp_count += num_samps_sent;
-    if (_io_impl->underflow_poll_samp_count >= _io_impl->tx_samps_per_poll_interval){
-        _io_impl->underflow_poll_samp_count = 0; //reset count
-        boost::uint8_t underflow = 0;
-        int ret = _fx2_ctrl->usrp_control_read(
-            VRQ_GET_STATUS, 0, GS_TX_UNDERRUN,
-            &underflow, sizeof(underflow)
-        );
-        if (ret < 0)        UHD_MSG(error) << "USRP: underflow check failed" << std::endl;
-        else if (underflow) UHD_MSG(fastpath) << "U";
+        async_metadata_t metadata;
+        metadata.channel = 0;
+        metadata.has_time_spec = true;
+        metadata.time_spec = _soft_time_ctrl->get_time();
+        metadata.event_code = async_metadata_t::EVENT_CODE_BURST_ACK;
+        _soft_time_ctrl->get_async_queue().push_with_pop_on_full(metadata);
+        this->tx_stream_on_off(false);
     }
 
     return num_samps_sent;
@@ -433,6 +502,9 @@ size_t usrp1_impl::recv(
     rx_metadata_t &metadata, const io_type_t &io_type,
     recv_mode_t recv_mode, double timeout
 ){
+    //interleave a "soft" inline message into the receive stream:
+    if (_soft_time_ctrl->get_inline_queue().pop_with_haste(metadata)) return 0;
+
     size_t num_samps_recvd = _io_impl->recv_handler.recv(
         buffs, nsamps_per_buff,
         metadata, io_type,
@@ -441,18 +513,5 @@ size_t usrp1_impl::recv(
 
     _soft_time_ctrl->recv_post(metadata, num_samps_recvd);
 
-    //handle the polling for overflow conditions
-    _io_impl->overflow_poll_samp_count += num_samps_recvd;
-    if (_io_impl->overflow_poll_samp_count >= _io_impl->rx_samps_per_poll_interval){
-        _io_impl->overflow_poll_samp_count = 0; //reset count
-        boost::uint8_t overflow = 0;
-        int ret = _fx2_ctrl->usrp_control_read(
-            VRQ_GET_STATUS, 0, GS_RX_OVERRUN,
-            &overflow, sizeof(overflow)
-        );
-        if (ret < 0)       UHD_MSG(error) << "USRP: overflow check failed" << std::endl;
-        else if (overflow) UHD_MSG(fastpath) << "O";
-    }
-
     return num_samps_recvd;
 }
diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
index 1bab34e7b..cf602185d 100644
--- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp
+++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
@@ -16,8 +16,7 @@
 //
 
 #include "soft_time_ctrl.hpp"
-#include <uhd/transport/bounded_buffer.hpp>
-#include <boost/any.hpp>
+#include <boost/make_shared.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/thread/barrier.hpp>
 #include <boost/thread/condition_variable.hpp>
@@ -41,6 +40,8 @@ public:
         _nsamps_remaining(0),
         _stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS),
         _cmd_queue(2),
+        _async_msg_queue(100),
+        _inline_msg_queue(100),
         _stream_on_off(stream_on_off)
     {
         //synchronously spawn a new thread
@@ -102,11 +103,20 @@ public:
         //When to stop streaming:
         //The samples have been received and the stream mode is non-continuous.
         //Rewrite the sample count to clip to the requested number of samples.
-        if (_nsamps_remaining <= nsamps){
+        if (_nsamps_remaining <= nsamps) switch(_stream_mode){
+        case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE:{
+            rx_metadata_t metadata;
+            metadata.has_time_spec = true;
+            metadata.time_spec = this->time_now();
+            metadata.error_code = rx_metadata_t::ERROR_CODE_BROKEN_CHAIN;
+            _inline_msg_queue.push_with_pop_on_full(metadata);
+        } //continue to next case...
+        case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE:
             nsamps = _nsamps_remaining; //set nsamps, then stop
             md.end_of_burst = true;
             stream_on_off(false);
             return;
+        default: break;
         }
 
         //update the consumed samples
@@ -114,7 +124,7 @@ public:
     }
 
     void issue_stream_cmd(const stream_cmd_t &cmd){
-        _cmd_queue.push_with_wait(cmd);
+        _cmd_queue.push_with_wait(boost::make_shared<stream_cmd_t>(cmd));
     }
 
     void stream_on_off(bool enb){
@@ -134,7 +144,12 @@ public:
 
         //handle late packets
         if (time_at < time_now()){
-            //TODO post async message
+            async_metadata_t metadata;
+            metadata.channel = 0;
+            metadata.has_time_spec = true;
+            metadata.time_spec = this->time_now();
+            metadata.event_code = async_metadata_t::EVENT_CODE_TIME_ERROR;
+            _async_msg_queue.push_with_pop_on_full(metadata);
             return true;
         }
 
@@ -153,7 +168,12 @@ public:
         if (not cmd.stream_now){
             time_spec_t time_at(cmd.time_spec - TWIDDLE);
             if (time_at < time_now()){
-                //TODO inject late cmd inline error
+                rx_metadata_t metadata;
+                metadata.has_time_spec = true;
+                metadata.time_spec = this->time_now();
+                metadata.error_code = rx_metadata_t::ERROR_CODE_LATE_COMMAND;
+                _inline_msg_queue.push_with_pop_on_full(metadata);
+                _stream_mode = stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS;
             }
             else{
                 sleep_until_time(lock, time_at);
@@ -180,20 +200,30 @@ public:
     void recv_cmd_dispatcher(boost::barrier &spawn_barrier){
         spawn_barrier.wait();
         try{
-            boost::any cmd;
+            boost::shared_ptr<stream_cmd_t> cmd;
             while (true){
                 _cmd_queue.pop_with_wait(cmd);
-                recv_cmd_handle_cmd(boost::any_cast<stream_cmd_t>(cmd));
+                recv_cmd_handle_cmd(*cmd);
             }
         } catch(const boost::thread_interrupted &){}
     }
 
+    bounded_buffer<async_metadata_t> &get_async_queue(void){
+        return _async_msg_queue;
+    }
+
+    bounded_buffer<rx_metadata_t> &get_inline_queue(void){
+        return _inline_msg_queue;
+    }
+
 private:
     boost::mutex _update_mutex;
     size_t _nsamps_remaining;
     stream_cmd_t::stream_mode_t _stream_mode;
     time_spec_t _time_offset;
-    bounded_buffer<boost::any> _cmd_queue;
+    bounded_buffer<boost::shared_ptr<stream_cmd_t> > _cmd_queue;
+    bounded_buffer<async_metadata_t> _async_msg_queue;
+    bounded_buffer<rx_metadata_t> _inline_msg_queue;
     const cb_fcn_type _stream_on_off;
     boost::thread_group _thread_group;
 };
diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.hpp b/host/lib/usrp/usrp1/soft_time_ctrl.hpp
index 7fdac7fc8..b2bf6c6f9 100644
--- a/host/lib/usrp/usrp1/soft_time_ctrl.hpp
+++ b/host/lib/usrp/usrp1/soft_time_ctrl.hpp
@@ -21,6 +21,7 @@
 #include <uhd/types/stream_cmd.hpp>
 #include <uhd/types/time_spec.hpp>
 #include <uhd/types/metadata.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
 #include <boost/utility.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/function.hpp>
@@ -45,8 +46,6 @@ public:
      * \return a new soft time control object
      */
     static sptr make(const cb_fcn_type &stream_on_off);
-        //TODO pass in the error queue for async msgs
-        //TODO pass in the queue for inline msgs
 
     //! Set the current time
     virtual void set_time(const time_spec_t &time) = 0;
@@ -62,6 +61,12 @@ public:
 
     //! Issue a stream command to receive
     virtual void issue_stream_cmd(const stream_cmd_t &cmd) = 0;
+
+    //! Get access to a buffer of async metadata
+    virtual transport::bounded_buffer<async_metadata_t> &get_async_queue(void) = 0;
+
+    //! Get access to a buffer of inline metadata
+    virtual transport::bounded_buffer<rx_metadata_t> &get_inline_queue(void) = 0;
 };
 
 }} //namespace
diff --git a/host/lib/usrp/usrp1/usrp1_impl.cpp b/host/lib/usrp/usrp1/usrp1_impl.cpp
index deb257a83..78137178b 100644
--- a/host/lib/usrp/usrp1/usrp1_impl.cpp
+++ b/host/lib/usrp/usrp1/usrp1_impl.cpp
@@ -294,8 +294,8 @@ usrp1_impl::usrp1_impl(const device_addr_t &device_addr){
             .coerce(boost::bind(&usrp1_impl::update_tx_samp_rate, this, _1));
         _tree->create<double>(tx_dsp_path / "freq/value")
             .coerce(boost::bind(&usrp1_impl::update_tx_dsp_freq, this, dspno, _1));
-        _tree->create<meta_range_t>(tx_dsp_path / "freq/range")
-            .set(meta_range_t(-_master_clock_rate/2, +_master_clock_rate/2));
+        _tree->create<meta_range_t>(tx_dsp_path / "freq/range") //magic scalar comes from codec control:
+            .set(meta_range_t(-_master_clock_rate*0.6875, +_master_clock_rate*0.6875));
     }
 
     ////////////////////////////////////////////////////////////////////
diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp
index f156d0bc4..cb1497253 100644
--- a/host/lib/usrp/usrp1/usrp1_impl.hpp
+++ b/host/lib/usrp/usrp1/usrp1_impl.hpp
@@ -30,6 +30,7 @@
 #include <uhd/usrp/subdev_spec.hpp>
 #include <uhd/usrp/dboard_eeprom.hpp>
 #include <uhd/usrp/dboard_manager.hpp>
+#include <boost/thread/barrier.hpp>
 #include <uhd/transport/usb_zero_copy.hpp>
 
 #ifndef INCLUDED_USRP1_IMPL_HPP
@@ -116,6 +117,7 @@ private:
     UHD_PIMPL_DECL(io_impl) _io_impl;
     void io_init(void);
     void rx_stream_on_off(bool);
+    void tx_stream_on_off(bool);
     void handle_overrun(size_t);
 
     //otw types
@@ -128,6 +130,8 @@ private:
     bool has_rx_halfband(void);
     bool has_tx_halfband(void);
 
+    void vandal_conquest_loop(boost::barrier &);
+
     //handle the enables
     bool _rx_enabled, _tx_enabled;
     void enable_rx(bool enb){
-- 
cgit v1.2.3