From c9ea7fb88809d935f3eaeee108415ff5abd17ead Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 5 Jun 2015 09:10:52 +0200 Subject: Set priorities for modulator and UHD threads --- src/DabMod.cpp | 9 +++++++++ src/OutputUHD.cpp | 14 ++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 2fe8d53..57f03b9 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -768,6 +768,15 @@ int launch_modulator(int argc, char* argv[]) } #endif + // Set thread priority to realtime + const int policy = SCHED_RR; + sched_param sp; + sp.sched_priority = sched_get_priority_min(policy); + int thread_prio_ret = pthread_setschedparam(pthread_self(), policy, &sp); + if (thread_prio_ret != 0) { + etiLog.level(error) << "Could not set priority for Modulator thread:" << thread_prio_ret; + } + while (run_again) { Flowgraph flowgraph; diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 28df515..ad31be7 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -104,12 +104,8 @@ OutputUHD::OutputUHD( RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter"); RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000"); - // TODO: find out how to use boost::bind to give the logger to the - // uhd_msg_handler uhd::msg::register_handler(uhd_msg_handler); - uhd::set_thread_priority_safe(); - //create a usrp device MDEBUG("OutputUHD:Creating the usrp device with: %s...\n", device.str().c_str()); @@ -477,6 +473,16 @@ void UHDWorker::process() // Transmit timeout const double timeout = 20.0; + // Set thread priority to realtime + const int policy = SCHED_RR; + sched_param sp; + sp.sched_priority = sched_get_priority_min(policy); + int ret = pthread_setschedparam(pthread_self(), policy, &sp); + if (ret != 0) { + etiLog.level(error) << "Could not set priority for UHD thread:" << ret; + } + + #if FAKE_UHD == 0 uhd::stream_args_t stream_args("fc32"); //complex floats uhd::tx_streamer::sptr myTxStream = uwd->myUsrp->get_tx_stream(stream_args); -- cgit v1.2.3 From 71eb84d5f483af8d22402de3d2ec70b08b5802d3 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 5 Jun 2015 09:15:56 +0200 Subject: Fix intermittent underruns after a restart When using UHD without synchronous=1: After a modulator restart, the input ZMQ buffer is nearly empty, and the modulator actually gets blocked by the ThreadsafeQueue. This is visible by looking at the time deltas printed in the debugging code. This commit adds a minimal form of prebuffering (10 ETI frames) to the ZeroMQ input, and removes the useless minimum required size for the ThreadsafeQueue. In synchronous=1, the issue is not visible because the TIST defines the time to transmit, which will cause the ZMQ buffer to fill. --- src/FIRFilter.h | 1 - src/InputReader.h | 3 +-- src/InputZeroMQReader.cpp | 32 ++++++++++++++++++++++++++++++-- src/OutputUHD.cpp | 17 +++++++++++++++++ src/ThreadsafeQueue.h | 24 ++++++------------------ src/Utils.h | 18 ++++++++++++++++++ 6 files changed, 72 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/FIRFilter.h b/src/FIRFilter.h index 05627d4..751be91 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -31,7 +31,6 @@ #include #include -#include "ThreadsafeQueue.h" #include "RemoteControl.h" #include "ModCodec.h" diff --git a/src/InputReader.h b/src/InputReader.h index 37aa523..b262cc9 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -183,8 +183,7 @@ class InputZeroMQWorker class InputZeroMQReader : public InputReader { public: - InputZeroMQReader() : - in_messages_(10) + InputZeroMQReader() { workerdata_.in_messages = &in_messages_; workerdata_.running = false; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 8706e1e..e95644a 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -41,6 +41,7 @@ #include "porting.h" #include "InputReader.h" #include "PcDebug.h" +#include "Utils.h" #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, @@ -86,7 +87,34 @@ int InputZeroMQReader::GetNextFrame(void* buffer) const size_t framesize = 6144; boost::shared_ptr > incoming; - in_messages_.wait_and_pop(incoming); + + struct timespec time_before; + int time_before_ret = clock_gettime(CLOCK_MONOTONIC, &time_before); + + /* Do some prebuffering because reads will happen in bursts + * (4 ETI frames in TM1) and we should make sure that + * we can serve the data required for a full transmission frame. + */ + if (in_messages_.size() < 4) { + const size_t prebuffering = 10; + in_messages_.wait_and_pop(incoming, prebuffering); + } + else { + in_messages_.wait_and_pop(incoming); + } + + struct timespec time_after; + int time_after_ret = clock_gettime(CLOCK_MONOTONIC, &time_after); + + if (time_before_ret == 0 and time_after_ret == 0) { + etiLog.level(debug) << "ZMQ Time delta : " << + timespecdiff_us(time_before, time_after) << " us, queue " << + in_messages_.size(); + } + else { + etiLog.level(error) << "ZMQ Time delta failed " << + time_before_ret << " " << time_after_ret; + } if (! workerdata_.running) { throw zmq_input_overflow(); @@ -193,7 +221,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } if (queue_size < 5) { - etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << "elements !"; + etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; } } } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index ad31be7..acc4271 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -31,6 +31,7 @@ #include "PcDebug.h" #include "Log.h" #include "RemoteControl.h" +#include "Utils.h" #include @@ -349,8 +350,24 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) " to " << dataIn->getLength(); throw std::runtime_error("Non-constant input length!"); } + + struct timespec time_before; + int time_before_ret = clock_gettime(CLOCK_MONOTONIC, &time_before); + mySyncBarrier.get()->wait(); + struct timespec time_after; + int time_after_ret = clock_gettime(CLOCK_MONOTONIC, &time_after); + + if (time_before_ret == 0 and time_after_ret == 0) { + etiLog.level(debug) << "Time delta : " << + timespecdiff_us(time_before, time_after) << " us"; + } + else { + etiLog.level(error) << "Time delta failed " << + time_before_ret << " " << time_after_ret; + } + if (!uwd.running) { worker.stop(); first_run = true; diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index 78e9ef0..e5e83ef 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -38,25 +38,14 @@ * that pushes elements into the queue, and one consumer that * retrieves the elements. * - * The queue can make the consumer block until enough elements - * are available. + * The queue can make the consumer block until an element + * is available. */ template class ThreadsafeQueue { public: - /* Create a new queue without any minimum required - * fill before it is possible to pop an element - */ - ThreadsafeQueue() : the_required_size(1) {} - - /* Create a queue where it has to contain at least - * required_size elements before pop is possible - */ - ThreadsafeQueue(size_t required_size) : the_required_size(required_size) { - } - /* Push one element into the queue, and notify another thread that * might be waiting. * @@ -87,14 +76,14 @@ public: size_t size() const { + boost::mutex::scoped_lock lock(the_mutex); return the_queue.size(); } bool try_pop(T& popped_value) { boost::mutex::scoped_lock lock(the_mutex); - if(the_queue.size() < the_required_size) - { + if (the_queue.empty()) { return false; } @@ -103,10 +92,10 @@ public: return true; } - void wait_and_pop(T& popped_value) + void wait_and_pop(T& popped_value, size_t prebuffering = 1) { boost::mutex::scoped_lock lock(the_mutex); - while(the_queue.size() < the_required_size) { + while (the_queue.size() < prebuffering) { the_condition_variable.wait(lock); } @@ -118,7 +107,6 @@ private: std::queue the_queue; mutable boost::mutex the_mutex; boost::condition_variable the_condition_variable; - size_t the_required_size; }; #endif diff --git a/src/Utils.h b/src/Utils.h index 7c3129c..f023646 100644 --- a/src/Utils.h +++ b/src/Utils.h @@ -35,10 +35,28 @@ #include #include #include +#include void printUsage(char* progName); void printVersion(void); +inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time) +{ + long tv_sec; + long tv_nsec; + if (time.tv_nsec < oldTime.tv_nsec) { + tv_sec = time.tv_sec - 1 - oldTime.tv_sec; + tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec; + } + else { + tv_sec = time.tv_sec - oldTime.tv_sec; + tv_nsec = time.tv_nsec - oldTime.tv_nsec; + } + + return tv_sec * 1000 + tv_nsec / 1000; +} + + #endif -- cgit v1.2.3 From c8b792fee07cfa591339cbf6f67454cb1cf4535b Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 5 Jun 2015 09:24:02 +0200 Subject: Remove debugging prints --- src/InputZeroMQReader.cpp | 17 ----------------- src/OutputUHD.cpp | 16 ---------------- 2 files changed, 33 deletions(-) (limited to 'src') diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index e95644a..36d4e4b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -41,7 +41,6 @@ #include "porting.h" #include "InputReader.h" #include "PcDebug.h" -#include "Utils.h" #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, @@ -88,9 +87,6 @@ int InputZeroMQReader::GetNextFrame(void* buffer) boost::shared_ptr > incoming; - struct timespec time_before; - int time_before_ret = clock_gettime(CLOCK_MONOTONIC, &time_before); - /* Do some prebuffering because reads will happen in bursts * (4 ETI frames in TM1) and we should make sure that * we can serve the data required for a full transmission frame. @@ -103,19 +99,6 @@ int InputZeroMQReader::GetNextFrame(void* buffer) in_messages_.wait_and_pop(incoming); } - struct timespec time_after; - int time_after_ret = clock_gettime(CLOCK_MONOTONIC, &time_after); - - if (time_before_ret == 0 and time_after_ret == 0) { - etiLog.level(debug) << "ZMQ Time delta : " << - timespecdiff_us(time_before, time_after) << " us, queue " << - in_messages_.size(); - } - else { - etiLog.level(error) << "ZMQ Time delta failed " << - time_before_ret << " " << time_after_ret; - } - if (! workerdata_.running) { throw zmq_input_overflow(); } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index acc4271..6ad7dfd 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -31,7 +31,6 @@ #include "PcDebug.h" #include "Log.h" #include "RemoteControl.h" -#include "Utils.h" #include @@ -351,23 +350,8 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) throw std::runtime_error("Non-constant input length!"); } - struct timespec time_before; - int time_before_ret = clock_gettime(CLOCK_MONOTONIC, &time_before); - mySyncBarrier.get()->wait(); - struct timespec time_after; - int time_after_ret = clock_gettime(CLOCK_MONOTONIC, &time_after); - - if (time_before_ret == 0 and time_after_ret == 0) { - etiLog.level(debug) << "Time delta : " << - timespecdiff_us(time_before, time_after) << " us"; - } - else { - etiLog.level(error) << "Time delta failed " << - time_before_ret << " " << time_after_ret; - } - if (!uwd.running) { worker.stop(); first_run = true; -- cgit v1.2.3