diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-06-05 10:09:18 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-06-05 10:09:18 +0200 |
commit | 01f55e125cdc2156bb6936f70aa52d13ff40420f (patch) | |
tree | 937c4e753c618409b87ce7f4498b0c73b340b4f1 | |
parent | 23ad47277e73348bca32226d87274541e56bbebb (diff) | |
parent | c8b792fee07cfa591339cbf6f67454cb1cf4535b (diff) | |
download | dabmod-01f55e125cdc2156bb6936f70aa52d13ff40420f.tar.gz dabmod-01f55e125cdc2156bb6936f70aa52d13ff40420f.tar.bz2 dabmod-01f55e125cdc2156bb6936f70aa52d13ff40420f.zip |
Merge branch 'next' into tii
-rw-r--r-- | src/DabMod.cpp | 9 | ||||
-rw-r--r-- | src/FIRFilter.h | 1 | ||||
-rw-r--r-- | src/InputReader.h | 3 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 15 | ||||
-rw-r--r-- | src/OutputUHD.cpp | 15 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 24 | ||||
-rw-r--r-- | src/Utils.h | 18 |
7 files changed, 58 insertions, 27 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 2fe8d53..57f03b9 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -768,6 +768,15 @@ int launch_modulator(int argc, char* argv[]) } #endif + // Set thread priority to realtime + const int policy = SCHED_RR; + sched_param sp; + sp.sched_priority = sched_get_priority_min(policy); + int thread_prio_ret = pthread_setschedparam(pthread_self(), policy, &sp); + if (thread_prio_ret != 0) { + etiLog.level(error) << "Could not set priority for Modulator thread:" << thread_prio_ret; + } + while (run_again) { Flowgraph flowgraph; diff --git a/src/FIRFilter.h b/src/FIRFilter.h index 05627d4..751be91 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -31,7 +31,6 @@ #include <boost/thread.hpp> #include <boost/shared_ptr.hpp> -#include "ThreadsafeQueue.h" #include "RemoteControl.h" #include "ModCodec.h" diff --git a/src/InputReader.h b/src/InputReader.h index 37aa523..b262cc9 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -183,8 +183,7 @@ class InputZeroMQWorker class InputZeroMQReader : public InputReader { public: - InputZeroMQReader() : - in_messages_(10) + InputZeroMQReader() { workerdata_.in_messages = &in_messages_; workerdata_.running = false; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 8706e1e..36d4e4b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -86,7 +86,18 @@ int InputZeroMQReader::GetNextFrame(void* buffer) const size_t framesize = 6144; boost::shared_ptr<std::vector<uint8_t> > incoming; - in_messages_.wait_and_pop(incoming); + + /* Do some prebuffering because reads will happen in bursts + * (4 ETI frames in TM1) and we should make sure that + * we can serve the data required for a full transmission frame. + */ + if (in_messages_.size() < 4) { + const size_t prebuffering = 10; + in_messages_.wait_and_pop(incoming, prebuffering); + } + else { + in_messages_.wait_and_pop(incoming); + } if (! workerdata_.running) { throw zmq_input_overflow(); @@ -193,7 +204,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } if (queue_size < 5) { - etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << "elements !"; + etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; } } } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 28df515..6ad7dfd 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -104,12 +104,8 @@ OutputUHD::OutputUHD( RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter"); RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000"); - // TODO: find out how to use boost::bind to give the logger to the - // uhd_msg_handler uhd::msg::register_handler(uhd_msg_handler); - uhd::set_thread_priority_safe(); - //create a usrp device MDEBUG("OutputUHD:Creating the usrp device with: %s...\n", device.str().c_str()); @@ -353,6 +349,7 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) " to " << dataIn->getLength(); throw std::runtime_error("Non-constant input length!"); } + mySyncBarrier.get()->wait(); if (!uwd.running) { @@ -477,6 +474,16 @@ void UHDWorker::process() // Transmit timeout const double timeout = 20.0; + // Set thread priority to realtime + const int policy = SCHED_RR; + sched_param sp; + sp.sched_priority = sched_get_priority_min(policy); + int ret = pthread_setschedparam(pthread_self(), policy, &sp); + if (ret != 0) { + etiLog.level(error) << "Could not set priority for UHD thread:" << ret; + } + + #if FAKE_UHD == 0 uhd::stream_args_t stream_args("fc32"); //complex floats uhd::tx_streamer::sptr myTxStream = uwd->myUsrp->get_tx_stream(stream_args); diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index 78e9ef0..e5e83ef 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -38,25 +38,14 @@ * that pushes elements into the queue, and one consumer that * retrieves the elements. * - * The queue can make the consumer block until enough elements - * are available. + * The queue can make the consumer block until an element + * is available. */ template<typename T> class ThreadsafeQueue { public: - /* Create a new queue without any minimum required - * fill before it is possible to pop an element - */ - ThreadsafeQueue() : the_required_size(1) {} - - /* Create a queue where it has to contain at least - * required_size elements before pop is possible - */ - ThreadsafeQueue(size_t required_size) : the_required_size(required_size) { - } - /* Push one element into the queue, and notify another thread that * might be waiting. * @@ -87,14 +76,14 @@ public: size_t size() const { + boost::mutex::scoped_lock lock(the_mutex); return the_queue.size(); } bool try_pop(T& popped_value) { boost::mutex::scoped_lock lock(the_mutex); - if(the_queue.size() < the_required_size) - { + if (the_queue.empty()) { return false; } @@ -103,10 +92,10 @@ public: return true; } - void wait_and_pop(T& popped_value) + void wait_and_pop(T& popped_value, size_t prebuffering = 1) { boost::mutex::scoped_lock lock(the_mutex); - while(the_queue.size() < the_required_size) { + while (the_queue.size() < prebuffering) { the_condition_variable.wait(lock); } @@ -118,7 +107,6 @@ private: std::queue<T> the_queue; mutable boost::mutex the_mutex; boost::condition_variable the_condition_variable; - size_t the_required_size; }; #endif diff --git a/src/Utils.h b/src/Utils.h index 7c3129c..f023646 100644 --- a/src/Utils.h +++ b/src/Utils.h @@ -35,10 +35,28 @@ #include <stdlib.h> #include <unistd.h> #include <stdio.h> +#include <time.h> void printUsage(char* progName); void printVersion(void); +inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time) +{ + long tv_sec; + long tv_nsec; + if (time.tv_nsec < oldTime.tv_nsec) { + tv_sec = time.tv_sec - 1 - oldTime.tv_sec; + tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec; + } + else { + tv_sec = time.tv_sec - oldTime.tv_sec; + tv_nsec = time.tv_nsec - oldTime.tv_nsec; + } + + return tv_sec * 1000 + tv_nsec / 1000; +} + + #endif |