diff options
Diffstat (limited to 'host/lib/usrp/usrp1/soft_time_ctrl.cpp')
| -rw-r--r-- | host/lib/usrp/usrp1/soft_time_ctrl.cpp | 206 | 
1 files changed, 206 insertions, 0 deletions
diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp new file mode 100644 index 000000000..1bab34e7b --- /dev/null +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -0,0 +1,206 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include "soft_time_ctrl.hpp" +#include <uhd/transport/bounded_buffer.hpp> +#include <boost/any.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/barrier.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <iostream> + +using namespace uhd; +using namespace uhd::usrp; +using namespace uhd::transport; +namespace pt = boost::posix_time; + +static const time_spec_t TWIDDLE(0.0011); + +/*********************************************************************** + * Soft time control implementation + **********************************************************************/ +class soft_time_ctrl_impl : public soft_time_ctrl{ +public: + +    soft_time_ctrl_impl(const cb_fcn_type &stream_on_off): +        _nsamps_remaining(0), +        _stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS), +        _cmd_queue(2), +        _stream_on_off(stream_on_off) +    { +        //synchronously spawn a new thread +        boost::barrier spawn_barrier(2); +        _thread_group.create_thread(boost::bind( +            &soft_time_ctrl_impl::recv_cmd_dispatcher, this, boost::ref(spawn_barrier)) +        ); +        spawn_barrier.wait(); + +        //initialize the time to something +        this->set_time(time_spec_t(0.0)); +    } + +    ~soft_time_ctrl_impl(void){ +        _thread_group.interrupt_all(); +        _thread_group.join_all(); +    } + +    /******************************************************************* +     * Time control +     ******************************************************************/ +    void set_time(const time_spec_t &time){ +        boost::mutex::scoped_lock lock(_update_mutex); +        _time_offset = time_spec_t::get_system_time() - time; +    } + +    time_spec_t get_time(void){ +        boost::mutex::scoped_lock lock(_update_mutex); +        return time_now(); +    } + +    UHD_INLINE time_spec_t time_now(void){ +        //internal get time without scoped lock +        return time_spec_t::get_system_time() - _time_offset; +    } + +    UHD_INLINE void sleep_until_time( +        boost::mutex::scoped_lock &lock, const time_spec_t &time +    ){ +        boost::condition_variable cond; +        //use a condition variable to unlock, sleep, lock +        double seconds_to_sleep = (time - time_now()).get_real_secs(); +        cond.timed_wait(lock, pt::microseconds(long(seconds_to_sleep*1e6))); +    } + +    /******************************************************************* +     * Receive control +     ******************************************************************/ +    void recv_post(rx_metadata_t &md, size_t &nsamps){ +        boost::mutex::scoped_lock lock(_update_mutex); + +        //load the metadata with the expected time +        md.has_time_spec = true; +        md.time_spec = time_now(); + +        //none of the stuff below matters in continuous streaming mode +        if (_stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS) return; + +        //When to stop streaming: +        //The samples have been received and the stream mode is non-continuous. +        //Rewrite the sample count to clip to the requested number of samples. +        if (_nsamps_remaining <= nsamps){ +            nsamps = _nsamps_remaining; //set nsamps, then stop +            md.end_of_burst = true; +            stream_on_off(false); +            return; +        } + +        //update the consumed samples +        _nsamps_remaining -= nsamps; +    } + +    void issue_stream_cmd(const stream_cmd_t &cmd){ +        _cmd_queue.push_with_wait(cmd); +    } + +    void stream_on_off(bool enb){ +        _stream_on_off(enb); +        _nsamps_remaining = 0; +    } + +    /******************************************************************* +     * Transmit control +     ******************************************************************/ +    bool send_pre(const tx_metadata_t &md, double &timeout){ +        if (not md.has_time_spec) return false; + +        boost::mutex::scoped_lock lock(_update_mutex); + +        time_spec_t time_at(md.time_spec - TWIDDLE); + +        //handle late packets +        if (time_at < time_now()){ +            //TODO post async message +            return true; +        } + +        timeout -= (time_at - time_now()).get_real_secs(); +        sleep_until_time(lock, time_at); +        return false; +    } + +    /******************************************************************* +     * Thread control +     ******************************************************************/ +    void recv_cmd_handle_cmd(const stream_cmd_t &cmd){ +        boost::mutex::scoped_lock lock(_update_mutex); + +        //handle the stream at time by sleeping +        if (not cmd.stream_now){ +            time_spec_t time_at(cmd.time_spec - TWIDDLE); +            if (time_at < time_now()){ +                //TODO inject late cmd inline error +            } +            else{ +                sleep_until_time(lock, time_at); +            } +        } + +        //When to stop streaming: +        //Stop streaming when the command is a stop and streaming. +        if (cmd.stream_mode == stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS +           and _stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS +        ) stream_on_off(false); + +        //When to start streaming: +        //Start streaming when the command is not a stop and not streaming. +        if (cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS +           and _stream_mode == stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS +        ) stream_on_off(true); + +        //update the state +        _nsamps_remaining += cmd.num_samps; +        _stream_mode = cmd.stream_mode; +    } + +    void recv_cmd_dispatcher(boost::barrier &spawn_barrier){ +        spawn_barrier.wait(); +        try{ +            boost::any cmd; +            while (true){ +                _cmd_queue.pop_with_wait(cmd); +                recv_cmd_handle_cmd(boost::any_cast<stream_cmd_t>(cmd)); +            } +        } catch(const boost::thread_interrupted &){} +    } + +private: +    boost::mutex _update_mutex; +    size_t _nsamps_remaining; +    stream_cmd_t::stream_mode_t _stream_mode; +    time_spec_t _time_offset; +    bounded_buffer<boost::any> _cmd_queue; +    const cb_fcn_type _stream_on_off; +    boost::thread_group _thread_group; +}; + +/*********************************************************************** + * Soft time control factor + **********************************************************************/ +soft_time_ctrl::sptr soft_time_ctrl::make(const cb_fcn_type &stream_on_off){ +    return sptr(new soft_time_ctrl_impl(stream_on_off)); +}  | 
