aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/FIRFilter.h1
-rw-r--r--src/InputReader.h3
-rw-r--r--src/InputZeroMQReader.cpp32
-rw-r--r--src/OutputUHD.cpp17
-rw-r--r--src/ThreadsafeQueue.h24
-rw-r--r--src/Utils.h18
6 files changed, 72 insertions, 23 deletions
diff --git a/src/FIRFilter.h b/src/FIRFilter.h
index 05627d4..751be91 100644
--- a/src/FIRFilter.h
+++ b/src/FIRFilter.h
@@ -31,7 +31,6 @@
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
-#include "ThreadsafeQueue.h"
#include "RemoteControl.h"
#include "ModCodec.h"
diff --git a/src/InputReader.h b/src/InputReader.h
index 37aa523..b262cc9 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -183,8 +183,7 @@ class InputZeroMQWorker
class InputZeroMQReader : public InputReader
{
public:
- InputZeroMQReader() :
- in_messages_(10)
+ InputZeroMQReader()
{
workerdata_.in_messages = &in_messages_;
workerdata_.running = false;
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 8706e1e..e95644a 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -41,6 +41,7 @@
#include "porting.h"
#include "InputReader.h"
#include "PcDebug.h"
+#include "Utils.h"
#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
/* A concatenation of four ETI frames,
@@ -86,7 +87,34 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
const size_t framesize = 6144;
boost::shared_ptr<std::vector<uint8_t> > incoming;
- in_messages_.wait_and_pop(incoming);
+
+ struct timespec time_before;
+ int time_before_ret = clock_gettime(CLOCK_MONOTONIC, &time_before);
+
+ /* Do some prebuffering because reads will happen in bursts
+ * (4 ETI frames in TM1) and we should make sure that
+ * we can serve the data required for a full transmission frame.
+ */
+ if (in_messages_.size() < 4) {
+ const size_t prebuffering = 10;
+ in_messages_.wait_and_pop(incoming, prebuffering);
+ }
+ else {
+ in_messages_.wait_and_pop(incoming);
+ }
+
+ struct timespec time_after;
+ int time_after_ret = clock_gettime(CLOCK_MONOTONIC, &time_after);
+
+ if (time_before_ret == 0 and time_after_ret == 0) {
+ etiLog.level(debug) << "ZMQ Time delta : " <<
+ timespecdiff_us(time_before, time_after) << " us, queue " <<
+ in_messages_.size();
+ }
+ else {
+ etiLog.level(error) << "ZMQ Time delta failed " <<
+ time_before_ret << " " << time_after_ret;
+ }
if (! workerdata_.running) {
throw zmq_input_overflow();
@@ -193,7 +221,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
if (queue_size < 5) {
- etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << "elements !";
+ etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !";
}
}
}
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index ad31be7..acc4271 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -31,6 +31,7 @@
#include "PcDebug.h"
#include "Log.h"
#include "RemoteControl.h"
+#include "Utils.h"
#include <boost/thread/future.hpp>
@@ -349,8 +350,24 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
" to " << dataIn->getLength();
throw std::runtime_error("Non-constant input length!");
}
+
+ struct timespec time_before;
+ int time_before_ret = clock_gettime(CLOCK_MONOTONIC, &time_before);
+
mySyncBarrier.get()->wait();
+ struct timespec time_after;
+ int time_after_ret = clock_gettime(CLOCK_MONOTONIC, &time_after);
+
+ if (time_before_ret == 0 and time_after_ret == 0) {
+ etiLog.level(debug) << "Time delta : " <<
+ timespecdiff_us(time_before, time_after) << " us";
+ }
+ else {
+ etiLog.level(error) << "Time delta failed " <<
+ time_before_ret << " " << time_after_ret;
+ }
+
if (!uwd.running) {
worker.stop();
first_run = true;
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index 78e9ef0..e5e83ef 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -38,25 +38,14 @@
* that pushes elements into the queue, and one consumer that
* retrieves the elements.
*
- * The queue can make the consumer block until enough elements
- * are available.
+ * The queue can make the consumer block until an element
+ * is available.
*/
template<typename T>
class ThreadsafeQueue
{
public:
- /* Create a new queue without any minimum required
- * fill before it is possible to pop an element
- */
- ThreadsafeQueue() : the_required_size(1) {}
-
- /* Create a queue where it has to contain at least
- * required_size elements before pop is possible
- */
- ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {
- }
-
/* Push one element into the queue, and notify another thread that
* might be waiting.
*
@@ -87,14 +76,14 @@ public:
size_t size() const
{
+ boost::mutex::scoped_lock lock(the_mutex);
return the_queue.size();
}
bool try_pop(T& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
- if(the_queue.size() < the_required_size)
- {
+ if (the_queue.empty()) {
return false;
}
@@ -103,10 +92,10 @@ public:
return true;
}
- void wait_and_pop(T& popped_value)
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
boost::mutex::scoped_lock lock(the_mutex);
- while(the_queue.size() < the_required_size) {
+ while (the_queue.size() < prebuffering) {
the_condition_variable.wait(lock);
}
@@ -118,7 +107,6 @@ private:
std::queue<T> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
- size_t the_required_size;
};
#endif
diff --git a/src/Utils.h b/src/Utils.h
index 7c3129c..f023646 100644
--- a/src/Utils.h
+++ b/src/Utils.h
@@ -35,10 +35,28 @@
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
+#include <time.h>
void printUsage(char* progName);
void printVersion(void);
+inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time)
+{
+ long tv_sec;
+ long tv_nsec;
+ if (time.tv_nsec < oldTime.tv_nsec) {
+ tv_sec = time.tv_sec - 1 - oldTime.tv_sec;
+ tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec;
+ }
+ else {
+ tv_sec = time.tv_sec - oldTime.tv_sec;
+ tv_nsec = time.tv_nsec - oldTime.tv_nsec;
+ }
+
+ return tv_sec * 1000 + tv_nsec / 1000;
+}
+
+
#endif