aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/usrp/usrp1/soft_time_ctrl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/usrp/usrp1/soft_time_ctrl.cpp')
-rw-r--r--host/lib/usrp/usrp1/soft_time_ctrl.cpp65
1 files changed, 51 insertions, 14 deletions
diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
index 1bab34e7b..ac0899e28 100644
--- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp
+++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
@@ -16,8 +16,7 @@
//
#include "soft_time_ctrl.hpp"
-#include <uhd/transport/bounded_buffer.hpp>
-#include <boost/any.hpp>
+#include <boost/make_shared.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/thread/condition_variable.hpp>
@@ -41,6 +40,8 @@ public:
_nsamps_remaining(0),
_stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS),
_cmd_queue(2),
+ _async_msg_queue(100),
+ _inline_msg_queue(100),
_stream_on_off(stream_on_off)
{
//synchronously spawn a new thread
@@ -89,32 +90,47 @@ public:
/*******************************************************************
* Receive control
******************************************************************/
- void recv_post(rx_metadata_t &md, size_t &nsamps){
+ size_t recv_post(rx_metadata_t &md, const size_t nsamps){
boost::mutex::scoped_lock lock(_update_mutex);
+ //Since it timed out on the receive, check for inline messages...
+ //Must do a post check because recv() will not wake up for a message.
+ if (md.error_code == rx_metadata_t::ERROR_CODE_TIMEOUT){
+ if (_inline_msg_queue.pop_with_haste(md)) return 0;
+ }
+
//load the metadata with the expected time
md.has_time_spec = true;
md.time_spec = time_now();
//none of the stuff below matters in continuous streaming mode
- if (_stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS) return;
+ if (_stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS) return nsamps;
//When to stop streaming:
//The samples have been received and the stream mode is non-continuous.
//Rewrite the sample count to clip to the requested number of samples.
- if (_nsamps_remaining <= nsamps){
- nsamps = _nsamps_remaining; //set nsamps, then stop
+ if (_nsamps_remaining <= nsamps) switch(_stream_mode){
+ case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE:{
+ rx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = this->time_now();
+ metadata.error_code = rx_metadata_t::ERROR_CODE_BROKEN_CHAIN;
+ _inline_msg_queue.push_with_pop_on_full(metadata);
+ } //continue to next case...
+ case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE:
md.end_of_burst = true;
- stream_on_off(false);
- return;
+ this->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ return _nsamps_remaining;
+ default: break;
}
//update the consumed samples
_nsamps_remaining -= nsamps;
+ return nsamps;
}
void issue_stream_cmd(const stream_cmd_t &cmd){
- _cmd_queue.push_with_wait(cmd);
+ _cmd_queue.push_with_wait(boost::make_shared<stream_cmd_t>(cmd));
}
void stream_on_off(bool enb){
@@ -134,7 +150,12 @@ public:
//handle late packets
if (time_at < time_now()){
- //TODO post async message
+ async_metadata_t metadata;
+ metadata.channel = 0;
+ metadata.has_time_spec = true;
+ metadata.time_spec = this->time_now();
+ metadata.event_code = async_metadata_t::EVENT_CODE_TIME_ERROR;
+ _async_msg_queue.push_with_pop_on_full(metadata);
return true;
}
@@ -153,7 +174,13 @@ public:
if (not cmd.stream_now){
time_spec_t time_at(cmd.time_spec - TWIDDLE);
if (time_at < time_now()){
- //TODO inject late cmd inline error
+ rx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = this->time_now();
+ metadata.error_code = rx_metadata_t::ERROR_CODE_LATE_COMMAND;
+ _inline_msg_queue.push_with_pop_on_full(metadata);
+ this->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ return;
}
else{
sleep_until_time(lock, time_at);
@@ -180,20 +207,30 @@ public:
void recv_cmd_dispatcher(boost::barrier &spawn_barrier){
spawn_barrier.wait();
try{
- boost::any cmd;
+ boost::shared_ptr<stream_cmd_t> cmd;
while (true){
_cmd_queue.pop_with_wait(cmd);
- recv_cmd_handle_cmd(boost::any_cast<stream_cmd_t>(cmd));
+ recv_cmd_handle_cmd(*cmd);
}
} catch(const boost::thread_interrupted &){}
}
+ bounded_buffer<async_metadata_t> &get_async_queue(void){
+ return _async_msg_queue;
+ }
+
+ bounded_buffer<rx_metadata_t> &get_inline_queue(void){
+ return _inline_msg_queue;
+ }
+
private:
boost::mutex _update_mutex;
size_t _nsamps_remaining;
stream_cmd_t::stream_mode_t _stream_mode;
time_spec_t _time_offset;
- bounded_buffer<boost::any> _cmd_queue;
+ bounded_buffer<boost::shared_ptr<stream_cmd_t> > _cmd_queue;
+ bounded_buffer<async_metadata_t> _async_msg_queue;
+ bounded_buffer<rx_metadata_t> _inline_msg_queue;
const cb_fcn_type _stream_on_off;
boost::thread_group _thread_group;
};