diff options
Diffstat (limited to 'host/lib/usrp/usrp1/soft_time_ctrl.cpp')
-rw-r--r-- | host/lib/usrp/usrp1/soft_time_ctrl.cpp | 65 |
1 files changed, 51 insertions, 14 deletions
diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index 1bab34e7b..ac0899e28 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -16,8 +16,7 @@ // #include "soft_time_ctrl.hpp" -#include <uhd/transport/bounded_buffer.hpp> -#include <boost/any.hpp> +#include <boost/make_shared.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/barrier.hpp> #include <boost/thread/condition_variable.hpp> @@ -41,6 +40,8 @@ public: _nsamps_remaining(0), _stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS), _cmd_queue(2), + _async_msg_queue(100), + _inline_msg_queue(100), _stream_on_off(stream_on_off) { //synchronously spawn a new thread @@ -89,32 +90,47 @@ public: /******************************************************************* * Receive control ******************************************************************/ - void recv_post(rx_metadata_t &md, size_t &nsamps){ + size_t recv_post(rx_metadata_t &md, const size_t nsamps){ boost::mutex::scoped_lock lock(_update_mutex); + //Since it timed out on the receive, check for inline messages... + //Must do a post check because recv() will not wake up for a message. + if (md.error_code == rx_metadata_t::ERROR_CODE_TIMEOUT){ + if (_inline_msg_queue.pop_with_haste(md)) return 0; + } + //load the metadata with the expected time md.has_time_spec = true; md.time_spec = time_now(); //none of the stuff below matters in continuous streaming mode - if (_stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS) return; + if (_stream_mode == stream_cmd_t::STREAM_MODE_START_CONTINUOUS) return nsamps; //When to stop streaming: //The samples have been received and the stream mode is non-continuous. //Rewrite the sample count to clip to the requested number of samples. - if (_nsamps_remaining <= nsamps){ - nsamps = _nsamps_remaining; //set nsamps, then stop + if (_nsamps_remaining <= nsamps) switch(_stream_mode){ + case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE:{ + rx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.error_code = rx_metadata_t::ERROR_CODE_BROKEN_CHAIN; + _inline_msg_queue.push_with_pop_on_full(metadata); + } //continue to next case... + case stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE: md.end_of_burst = true; - stream_on_off(false); - return; + this->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); + return _nsamps_remaining; + default: break; } //update the consumed samples _nsamps_remaining -= nsamps; + return nsamps; } void issue_stream_cmd(const stream_cmd_t &cmd){ - _cmd_queue.push_with_wait(cmd); + _cmd_queue.push_with_wait(boost::make_shared<stream_cmd_t>(cmd)); } void stream_on_off(bool enb){ @@ -134,7 +150,12 @@ public: //handle late packets if (time_at < time_now()){ - //TODO post async message + async_metadata_t metadata; + metadata.channel = 0; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.event_code = async_metadata_t::EVENT_CODE_TIME_ERROR; + _async_msg_queue.push_with_pop_on_full(metadata); return true; } @@ -153,7 +174,13 @@ public: if (not cmd.stream_now){ time_spec_t time_at(cmd.time_spec - TWIDDLE); if (time_at < time_now()){ - //TODO inject late cmd inline error + rx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = this->time_now(); + metadata.error_code = rx_metadata_t::ERROR_CODE_LATE_COMMAND; + _inline_msg_queue.push_with_pop_on_full(metadata); + this->issue_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); + return; } else{ sleep_until_time(lock, time_at); @@ -180,20 +207,30 @@ public: void recv_cmd_dispatcher(boost::barrier &spawn_barrier){ spawn_barrier.wait(); try{ - boost::any cmd; + boost::shared_ptr<stream_cmd_t> cmd; while (true){ _cmd_queue.pop_with_wait(cmd); - recv_cmd_handle_cmd(boost::any_cast<stream_cmd_t>(cmd)); + recv_cmd_handle_cmd(*cmd); } } catch(const boost::thread_interrupted &){} } + bounded_buffer<async_metadata_t> &get_async_queue(void){ + return _async_msg_queue; + } + + bounded_buffer<rx_metadata_t> &get_inline_queue(void){ + return _inline_msg_queue; + } + private: boost::mutex _update_mutex; size_t _nsamps_remaining; stream_cmd_t::stream_mode_t _stream_mode; time_spec_t _time_offset; - bounded_buffer<boost::any> _cmd_queue; + bounded_buffer<boost::shared_ptr<stream_cmd_t> > _cmd_queue; + bounded_buffer<async_metadata_t> _async_msg_queue; + bounded_buffer<rx_metadata_t> _inline_msg_queue; const cb_fcn_type _stream_on_off; boost::thread_group _thread_group; }; |