diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-20 10:12:06 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-20 10:12:06 +0100 |
commit | c5c21c73c310c29675bff1a1f2da4ddd298c0f92 (patch) | |
tree | b58887652b2c0605b0d7d145377638cd7c754187 | |
parent | 7bacaf3818494cf706f7372cb6cd194dbaf3f251 (diff) | |
download | dabmod-c5c21c73c310c29675bff1a1f2da4ddd298c0f92.tar.gz dabmod-c5c21c73c310c29675bff1a1f2da4ddd298c0f92.tar.bz2 dabmod-c5c21c73c310c29675bff1a1f2da4ddd298c0f92.zip |
Add max_frames_queued option for zmq input
-rw-r--r-- | doc/example.ini | 3 | ||||
-rw-r--r-- | src/DabMod.cpp | 10 | ||||
-rw-r--r-- | src/InputReader.h | 6 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 9 |
4 files changed, 19 insertions, 9 deletions
diff --git a/doc/example.ini b/doc/example.ini index cec0f23..3c51142 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -49,6 +49,9 @@ loop=0 ; When recieving data using ZeroMQ, the source is the URI to be used ;transport=zeromq ;source=tcp://localhost:8080 +; The option max_frames_queued defines the maximum number of ETI frames +; that can be in the input queue +;max_frames_queued=100 [modulator] ; Gain mode: 0=FIX, 1=MAX, 2=VAR diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 1bbfc99..f546e45 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -68,6 +68,8 @@ # define memalign(a, b) malloc(b) #endif +#define ZMQ_INPUT_MAX_FRAME_QUEUE 50 + typedef std::complex<float> complexf; @@ -87,6 +89,7 @@ int main(int argc, char* argv[]) bool loop = false; std::string inputName = ""; std::string inputTransport = "file"; + unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE; std::string outputName; int useZeroMQOutput = 0; @@ -362,6 +365,9 @@ int main(int argc, char* argv[]) } inputTransport = pt.get("input.transport", "file"); + inputMaxFramesQueued = pt.get("input.max_frames_queued", + ZMQ_INPUT_MAX_FRAME_QUEUE); + inputName = pt.get("input.source", "/dev/stdin"); // log parameters: @@ -677,10 +683,10 @@ int main(int argc, char* argv[]) #else // The URL might start with zmq+tcp:// if (inputName.substr(0, 4) == "zmq+") { - inputZeroMQReader.Open(inputName.substr(4)); + inputZeroMQReader.Open(inputName.substr(4), inputMaxFramesQueued); } else { - inputZeroMQReader.Open(inputName); + inputZeroMQReader.Open(inputName, inputMaxFramesQueued); } inputReader = &inputZeroMQReader; #endif diff --git a/src/InputReader.h b/src/InputReader.h index 3e0dcab..3e3e000 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyrigth (C) 2013 + Copyrigth (C) 2013, 2015 Matthias P. Braendli, matthias.braendli@mpb.li */ /* @@ -137,6 +137,7 @@ struct InputZeroMQThreadData { ThreadsafeQueue<uint8_t*> *in_messages; std::string uri; + unsigned max_queued_frames; }; class InputZeroMQWorker @@ -179,7 +180,7 @@ class InputZeroMQReader : public InputReader worker_.Stop(); } - int Open(std::string uri); + int Open(const std::string& uri, unsigned max_queued_frames); int GetNextFrame(void* buffer); @@ -197,3 +198,4 @@ class InputZeroMQReader : public InputReader #endif #endif + diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f7f5702..01d8720 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014 + Copyright (C) 2013, 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -41,8 +41,6 @@ #include "InputReader.h" #include "PcDebug.h" -#define MAX_QUEUE_SIZE 50 - #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, * whose maximal size is 6144. @@ -64,10 +62,11 @@ struct zmq_dab_message_t uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; -int InputZeroMQReader::Open(std::string uri) +int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames) { uri_ = uri; workerdata_.uri = uri; + workerdata_.max_queued_frames = max_queued_frames; // launch receiver thread worker_.Start(&workerdata_); @@ -123,7 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } m_to_drop--; } - else if (queue_size < MAX_QUEUE_SIZE) { + else if (queue_size < workerdata->max_queued_frames) { if (buffer_full) { fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", queue_size); |