//
// Copyright 2011 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//
#include "soft_time_ctrl.hpp"
#include
#include
#include
#include
#include
#include
using namespace uhd;
using namespace uhd::usrp;
using namespace uhd::transport;
namespace pt = boost::posix_time;
namespace lt = boost::local_time;
/***********************************************************************
* Utility helper functions
**********************************************************************/
//TODO put these in time_spec_t (maybe useful)
time_spec_t time_dur_to_time_spec(const pt::time_duration &time_dur){
return time_spec_t(
time_dur.total_seconds(),
time_dur.fractional_seconds(),
pt::time_duration::ticks_per_second()
);
}
pt::time_duration time_spec_to_time_dur(const time_spec_t &time_spec){
return pt::time_duration(
0, 0, time_spec.get_full_secs(),
time_spec.get_tick_count(pt::time_duration::ticks_per_second())
);
}
/***********************************************************************
* Soft time control implementation
**********************************************************************/
class soft_time_ctrl_impl : public soft_time_ctrl{
public:
soft_time_ctrl_impl(const cb_fcn_type &stream_on_off):
_nsamps_remaining(0),
_stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS),
_cmd_queue(bounded_buffer::make(2)),
_stream_on_off(stream_on_off)
{
//synchronously spawn a new thread
_update_mutex.lock(); //lock mutex before spawned
_thread_group.create_thread(boost::bind(&soft_time_ctrl_impl::recv_cmd_dispatcher, this));
_update_mutex.lock(); //lock blocks until spawned
_update_mutex.unlock(); //unlock mutex before done
}
~soft_time_ctrl_impl(void){
_thread_running = false;
_thread_group.join_all();
}
void set_time(const time_spec_t &time){
_time_offset = pt::microsec_clock::universal_time() - time_spec_to_time_dur(time);
}
time_spec_t get_time(void){
return time_dur_to_time_spec(pt::microsec_clock::universal_time() - _time_offset);
}
void recv_post(rx_metadata_t &md, size_t &nsamps){
//load the metadata with the current time
md.has_time_spec = true;
md.time_spec = get_time();
//lock the mutex here before changing state
boost::mutex::scoped_lock lock(_update_mutex);
//When to stop streaming:
//The samples have been received and the stream mode is non-continuous.
//Rewrite the sample count to clip to the requested number of samples.
if (_nsamps_remaining <= nsamps and
_stream_mode != stream_cmd_t::STREAM_MODE_START_CONTINUOUS
){
nsamps = _nsamps_remaining; //set nsamps, then stop
stream_on_off(false);
return;
}
//update the consumed samples
_nsamps_remaining -= nsamps;
}
void send_pre(const tx_metadata_t &md, double /*TODO timeout*/){
if (not md.has_time_spec) return;
sleep_until_time(md.time_spec); //TODO late?
}
void issue_stream_cmd(const stream_cmd_t &cmd){
_cmd_queue->push_with_wait(cmd);
}
private:
void sleep_until_time(const time_spec_t &time){
boost::this_thread::sleep(_time_offset + time_spec_to_time_dur(time));
}
void recv_cmd_handle_cmd(const stream_cmd_t &cmd){
//handle the stream at time by sleeping
if (not cmd.stream_now) sleep_until_time(cmd.time_spec); //TODO late?
//lock the mutex here before changing state
boost::mutex::scoped_lock lock(_update_mutex);
//When to stop streaming:
//Stop streaming when the command is a stop and streaming.
if (cmd.stream_mode == stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
and _stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
) stream_on_off(false);
//When to start streaming:
//Start streaming when the command is not a stop and not streaming.
if (cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
and _stream_mode == stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
) stream_on_off(true);
//update the state
_nsamps_remaining += cmd.num_samps;
_stream_mode = cmd.stream_mode;
}
void recv_cmd_dispatcher(void){
_thread_running = true;
_update_mutex.unlock();
boost::any cmd;
while (_thread_running){
if (_cmd_queue->pop_with_timed_wait(cmd, 1.0)){
recv_cmd_handle_cmd(boost::any_cast(cmd));
}
}
}
void stream_on_off(bool stream){
_stream_on_off(stream);
_nsamps_remaining = 0;
}
boost::mutex _update_mutex;
size_t _nsamps_remaining;
stream_cmd_t::stream_mode_t _stream_mode;
pt::ptime _time_offset;
bounded_buffer::sptr _cmd_queue;
const cb_fcn_type _stream_on_off;
boost::thread_group _thread_group;
bool _thread_running;
};
/***********************************************************************
* Soft time control factor
**********************************************************************/
soft_time_ctrl::sptr soft_time_ctrl::make(const cb_fcn_type &stream_on_off){
return sptr(new soft_time_ctrl_impl(stream_on_off));
}