aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/example.ini3
-rw-r--r--src/DabMod.cpp10
-rw-r--r--src/InputReader.h6
-rw-r--r--src/InputZeroMQReader.cpp9
4 files changed, 19 insertions, 9 deletions
diff --git a/doc/example.ini b/doc/example.ini
index cec0f23..3c51142 100644
--- a/doc/example.ini
+++ b/doc/example.ini
@@ -49,6 +49,9 @@ loop=0
; When recieving data using ZeroMQ, the source is the URI to be used
;transport=zeromq
;source=tcp://localhost:8080
+; The option max_frames_queued defines the maximum number of ETI frames
+; that can be in the input queue
+;max_frames_queued=100
[modulator]
; Gain mode: 0=FIX, 1=MAX, 2=VAR
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 1bbfc99..f546e45 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -68,6 +68,8 @@
# define memalign(a, b) malloc(b)
#endif
+#define ZMQ_INPUT_MAX_FRAME_QUEUE 50
+
typedef std::complex<float> complexf;
@@ -87,6 +89,7 @@ int main(int argc, char* argv[])
bool loop = false;
std::string inputName = "";
std::string inputTransport = "file";
+ unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE;
std::string outputName;
int useZeroMQOutput = 0;
@@ -362,6 +365,9 @@ int main(int argc, char* argv[])
}
inputTransport = pt.get("input.transport", "file");
+ inputMaxFramesQueued = pt.get("input.max_frames_queued",
+ ZMQ_INPUT_MAX_FRAME_QUEUE);
+
inputName = pt.get("input.source", "/dev/stdin");
// log parameters:
@@ -677,10 +683,10 @@ int main(int argc, char* argv[])
#else
// The URL might start with zmq+tcp://
if (inputName.substr(0, 4) == "zmq+") {
- inputZeroMQReader.Open(inputName.substr(4));
+ inputZeroMQReader.Open(inputName.substr(4), inputMaxFramesQueued);
}
else {
- inputZeroMQReader.Open(inputName);
+ inputZeroMQReader.Open(inputName, inputMaxFramesQueued);
}
inputReader = &inputZeroMQReader;
#endif
diff --git a/src/InputReader.h b/src/InputReader.h
index 3e0dcab..3e3e000 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyrigth (C) 2013
+ Copyrigth (C) 2013, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
@@ -137,6 +137,7 @@ struct InputZeroMQThreadData
{
ThreadsafeQueue<uint8_t*> *in_messages;
std::string uri;
+ unsigned max_queued_frames;
};
class InputZeroMQWorker
@@ -179,7 +180,7 @@ class InputZeroMQReader : public InputReader
worker_.Stop();
}
- int Open(std::string uri);
+ int Open(const std::string& uri, unsigned max_queued_frames);
int GetNextFrame(void* buffer);
@@ -197,3 +198,4 @@ class InputZeroMQReader : public InputReader
#endif
#endif
+
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index f7f5702..01d8720 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2013, 2014
+ Copyright (C) 2013, 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -41,8 +41,6 @@
#include "InputReader.h"
#include "PcDebug.h"
-#define MAX_QUEUE_SIZE 50
-
#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
/* A concatenation of four ETI frames,
* whose maximal size is 6144.
@@ -64,10 +62,11 @@ struct zmq_dab_message_t
uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
};
-int InputZeroMQReader::Open(std::string uri)
+int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames)
{
uri_ = uri;
workerdata_.uri = uri;
+ workerdata_.max_queued_frames = max_queued_frames;
// launch receiver thread
worker_.Start(&workerdata_);
@@ -123,7 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
m_to_drop--;
}
- else if (queue_size < MAX_QUEUE_SIZE) {
+ else if (queue_size < workerdata->max_queued_frames) {
if (buffer_full) {
fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
queue_size);