// // Copyright 2011-2012,2014 Ettus Research LLC // Copyright 2018 Ettus Research, a National Instruments Company // // SPDX-License-Identifier: GPL-3.0-or-later // #include "soft_time_ctrl.hpp" #include #include #include #include #include #include #include using namespace uhd; using namespace uhd::usrp; using namespace uhd::transport; namespace pt = boost::posix_time; static const time_spec_t TWIDDLE(0.0011); soft_time_ctrl::~soft_time_ctrl(void) { /* NOP */ } /*********************************************************************** * Soft time control implementation **********************************************************************/ class soft_time_ctrl_impl : public soft_time_ctrl { public: soft_time_ctrl_impl(const cb_fcn_type& stream_on_off) : _nsamps_remaining(0) , _stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) , _cmd_queue(2) , _async_msg_queue(1000) , _inline_msg_queue(1000) , _stream_on_off(stream_on_off) { // synchronously spawn a new thread _recv_cmd_task = task::make(std::bind(&soft_time_ctrl_impl::recv_cmd_task, this)); // initialize the time to something this->set_time(time_spec_t(0.0)); } /******************************************************************* * Time control ******************************************************************/ void set_time(const time_spec_t& time) override { std::lock_guard lock(_update_mutex); _time_offset = uhd::get_system_time() - time; } time_spec_t get_time(void) override { std::lock_guard lock(_update_mutex); return time_now(); } UHD_INLINE time_spec_t time_now(void) { // internal get time without scoped lock return uhd::get_system_time() - _time_offset; } UHD_INLINE void sleep_until_time( std::unique_lock& lock, const time_spec_t& time) { boost::condition cond; // use a condition variable to unlock, sleep, lock const double seconds_to_sleep = (time - time_now()).get_real_secs(); cond.timed_wait(lock, pt::microseconds(long(seconds_to_sleep * 1e6))); } /******************************************************************* * Receive control ******************************************************************/ size_t recv_post(rx_metadata_t& md, const size_t nsamps) override { std::lock_guard lock(_update_mutex); // Since it timed out on the receive, check for inline messages... // Must do a post check because recv() will not wake up for a message. if (md.error_code == rx_metadata_t::ERROR_CODE_TIMEOUT) { if (_inline_msg_queue.pop_with_haste(md)) return 0; } // load the metadata with the expected time md.has_time_spec = true; md.time_spec = time_now(); // none of the stuff below matters in continuous streaming mode if (_stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS) return nsamps; // When to stop streaming: // The samples have been received and the stream mode is non-continuous. // Rewrite the sample count to clip to the requested number of samples. if (_nsamps_remaining <= nsamps) switch (_stream_mode) { case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE: { rx_metadata_t metadata; metadata.has_time_spec = true; metadata.time_spec = this->time_now(); metadata.error_code = rx_metadata_t::ERROR_CODE_BROKEN_CHAIN; _inline_msg_queue.push_with_pop_on_full(metadata); } // continue to next case... UHD_FALLTHROUGH case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE: md.end_of_burst = true; this->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); return _nsamps_remaining; default: break; } // update the consumed samples _nsamps_remaining -= nsamps; return nsamps; } void issue_stream_cmd(const stream_cmd_t& cmd) override { _cmd_queue.push_with_wait(std::make_shared(cmd)); } void stream_on_off(bool enb) { _stream_on_off(enb); _nsamps_remaining = 0; } /******************************************************************* * Transmit control ******************************************************************/ void send_pre(const tx_metadata_t& md, double& timeout) override { if (not md.has_time_spec) return; std::unique_lock lock(_update_mutex); time_spec_t time_at(md.time_spec - TWIDDLE); // handle late packets if (time_at < time_now()) { async_metadata_t metadata; metadata.channel = 0; metadata.has_time_spec = true; metadata.time_spec = this->time_now(); metadata.event_code = async_metadata_t::EVENT_CODE_TIME_ERROR; _async_msg_queue.push_with_pop_on_full(metadata); return; } timeout -= (time_at - time_now()).get_real_secs(); sleep_until_time(lock, time_at); } /******************************************************************* * Thread control ******************************************************************/ void recv_cmd_handle_cmd(const stream_cmd_t& cmd) { std::unique_lock lock(_update_mutex); // handle the stream at time by sleeping if (not cmd.stream_now) { time_spec_t time_at(cmd.time_spec - TWIDDLE); if (time_at < time_now()) { rx_metadata_t metadata; metadata.has_time_spec = true; metadata.time_spec = this->time_now(); metadata.error_code = rx_metadata_t::ERROR_CODE_LATE_COMMAND; _inline_msg_queue.push_with_pop_on_full(metadata); this->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); return; } else { sleep_until_time(lock, time_at); } } // When to stop streaming: // Stop streaming when the command is a stop and streaming. if (cmd.stream_mode == stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS and _stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) stream_on_off(false); // When to start streaming: // Start streaming when the command is not a stop and not streaming. if (cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS and _stream_mode == stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS) stream_on_off(true); // update the state _nsamps_remaining += cmd.num_samps; _stream_mode = cmd.stream_mode; } void recv_cmd_task(void) { // task is looped std::shared_ptr cmd; if (_cmd_queue.pop_with_timed_wait(cmd, 0.25)) { recv_cmd_handle_cmd(*cmd); } } bounded_buffer& get_async_queue(void) override { return _async_msg_queue; } bounded_buffer& get_inline_queue(void) override { return _inline_msg_queue; } void stop(void) override { _recv_cmd_task.reset(); } private: std::mutex _update_mutex; size_t _nsamps_remaining; stream_cmd_t::stream_mode_t _stream_mode; time_spec_t _time_offset; bounded_buffer> _cmd_queue; bounded_buffer _async_msg_queue; bounded_buffer _inline_msg_queue; const cb_fcn_type _stream_on_off; task::sptr _recv_cmd_task; }; /*********************************************************************** * Soft time control factor **********************************************************************/ soft_time_ctrl::sptr soft_time_ctrl::make(const cb_fcn_type& stream_on_off) { return sptr(new soft_time_ctrl_impl(stream_on_off)); }