summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-06-05 10:09:18 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-06-05 10:09:18 +0200
commit01f55e125cdc2156bb6936f70aa52d13ff40420f (patch)
tree937c4e753c618409b87ce7f4498b0c73b340b4f1
parent23ad47277e73348bca32226d87274541e56bbebb (diff)
parentc8b792fee07cfa591339cbf6f67454cb1cf4535b (diff)
downloaddabmod-01f55e125cdc2156bb6936f70aa52d13ff40420f.tar.gz
dabmod-01f55e125cdc2156bb6936f70aa52d13ff40420f.tar.bz2
dabmod-01f55e125cdc2156bb6936f70aa52d13ff40420f.zip
Merge branch 'next' into tii
-rw-r--r--src/DabMod.cpp9
-rw-r--r--src/FIRFilter.h1
-rw-r--r--src/InputReader.h3
-rw-r--r--src/InputZeroMQReader.cpp15
-rw-r--r--src/OutputUHD.cpp15
-rw-r--r--src/ThreadsafeQueue.h24
-rw-r--r--src/Utils.h18
7 files changed, 58 insertions, 27 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 2fe8d53..57f03b9 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -768,6 +768,15 @@ int launch_modulator(int argc, char* argv[])
}
#endif
+ // Set thread priority to realtime
+ const int policy = SCHED_RR;
+ sched_param sp;
+ sp.sched_priority = sched_get_priority_min(policy);
+ int thread_prio_ret = pthread_setschedparam(pthread_self(), policy, &sp);
+ if (thread_prio_ret != 0) {
+ etiLog.level(error) << "Could not set priority for Modulator thread:" << thread_prio_ret;
+ }
+
while (run_again) {
Flowgraph flowgraph;
diff --git a/src/FIRFilter.h b/src/FIRFilter.h
index 05627d4..751be91 100644
--- a/src/FIRFilter.h
+++ b/src/FIRFilter.h
@@ -31,7 +31,6 @@
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
-#include "ThreadsafeQueue.h"
#include "RemoteControl.h"
#include "ModCodec.h"
diff --git a/src/InputReader.h b/src/InputReader.h
index 37aa523..b262cc9 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -183,8 +183,7 @@ class InputZeroMQWorker
class InputZeroMQReader : public InputReader
{
public:
- InputZeroMQReader() :
- in_messages_(10)
+ InputZeroMQReader()
{
workerdata_.in_messages = &in_messages_;
workerdata_.running = false;
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 8706e1e..36d4e4b 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -86,7 +86,18 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
const size_t framesize = 6144;
boost::shared_ptr<std::vector<uint8_t> > incoming;
- in_messages_.wait_and_pop(incoming);
+
+ /* Do some prebuffering because reads will happen in bursts
+ * (4 ETI frames in TM1) and we should make sure that
+ * we can serve the data required for a full transmission frame.
+ */
+ if (in_messages_.size() < 4) {
+ const size_t prebuffering = 10;
+ in_messages_.wait_and_pop(incoming, prebuffering);
+ }
+ else {
+ in_messages_.wait_and_pop(incoming);
+ }
if (! workerdata_.running) {
throw zmq_input_overflow();
@@ -193,7 +204,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
if (queue_size < 5) {
- etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << "elements !";
+ etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !";
}
}
}
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index 28df515..6ad7dfd 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -104,12 +104,8 @@ OutputUHD::OutputUHD(
RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter");
RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000");
- // TODO: find out how to use boost::bind to give the logger to the
- // uhd_msg_handler
uhd::msg::register_handler(uhd_msg_handler);
- uhd::set_thread_priority_safe();
-
//create a usrp device
MDEBUG("OutputUHD:Creating the usrp device with: %s...\n",
device.str().c_str());
@@ -353,6 +349,7 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
" to " << dataIn->getLength();
throw std::runtime_error("Non-constant input length!");
}
+
mySyncBarrier.get()->wait();
if (!uwd.running) {
@@ -477,6 +474,16 @@ void UHDWorker::process()
// Transmit timeout
const double timeout = 20.0;
+ // Set thread priority to realtime
+ const int policy = SCHED_RR;
+ sched_param sp;
+ sp.sched_priority = sched_get_priority_min(policy);
+ int ret = pthread_setschedparam(pthread_self(), policy, &sp);
+ if (ret != 0) {
+ etiLog.level(error) << "Could not set priority for UHD thread:" << ret;
+ }
+
+
#if FAKE_UHD == 0
uhd::stream_args_t stream_args("fc32"); //complex floats
uhd::tx_streamer::sptr myTxStream = uwd->myUsrp->get_tx_stream(stream_args);
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index 78e9ef0..e5e83ef 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -38,25 +38,14 @@
* that pushes elements into the queue, and one consumer that
* retrieves the elements.
*
- * The queue can make the consumer block until enough elements
- * are available.
+ * The queue can make the consumer block until an element
+ * is available.
*/
template<typename T>
class ThreadsafeQueue
{
public:
- /* Create a new queue without any minimum required
- * fill before it is possible to pop an element
- */
- ThreadsafeQueue() : the_required_size(1) {}
-
- /* Create a queue where it has to contain at least
- * required_size elements before pop is possible
- */
- ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {
- }
-
/* Push one element into the queue, and notify another thread that
* might be waiting.
*
@@ -87,14 +76,14 @@ public:
size_t size() const
{
+ boost::mutex::scoped_lock lock(the_mutex);
return the_queue.size();
}
bool try_pop(T& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
- if(the_queue.size() < the_required_size)
- {
+ if (the_queue.empty()) {
return false;
}
@@ -103,10 +92,10 @@ public:
return true;
}
- void wait_and_pop(T& popped_value)
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
boost::mutex::scoped_lock lock(the_mutex);
- while(the_queue.size() < the_required_size) {
+ while (the_queue.size() < prebuffering) {
the_condition_variable.wait(lock);
}
@@ -118,7 +107,6 @@ private:
std::queue<T> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
- size_t the_required_size;
};
#endif
diff --git a/src/Utils.h b/src/Utils.h
index 7c3129c..f023646 100644
--- a/src/Utils.h
+++ b/src/Utils.h
@@ -35,10 +35,28 @@
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
+#include <time.h>
void printUsage(char* progName);
void printVersion(void);
+inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time)
+{
+ long tv_sec;
+ long tv_nsec;
+ if (time.tv_nsec < oldTime.tv_nsec) {
+ tv_sec = time.tv_sec - 1 - oldTime.tv_sec;
+ tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec;
+ }
+ else {
+ tv_sec = time.tv_sec - oldTime.tv_sec;
+ tv_nsec = time.tv_nsec - oldTime.tv_nsec;
+ }
+
+ return tv_sec * 1000 + tv_nsec / 1000;
+}
+
+
#endif