diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-20 10:12:06 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-20 10:12:06 +0100 | 
| commit | c5c21c73c310c29675bff1a1f2da4ddd298c0f92 (patch) | |
| tree | b58887652b2c0605b0d7d145377638cd7c754187 /src | |
| parent | 7bacaf3818494cf706f7372cb6cd194dbaf3f251 (diff) | |
| download | dabmod-c5c21c73c310c29675bff1a1f2da4ddd298c0f92.tar.gz dabmod-c5c21c73c310c29675bff1a1f2da4ddd298c0f92.tar.bz2 dabmod-c5c21c73c310c29675bff1a1f2da4ddd298c0f92.zip  | |
Add max_frames_queued option for zmq input
Diffstat (limited to 'src')
| -rw-r--r-- | src/DabMod.cpp | 10 | ||||
| -rw-r--r-- | src/InputReader.h | 6 | ||||
| -rw-r--r-- | src/InputZeroMQReader.cpp | 9 | 
3 files changed, 16 insertions, 9 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 1bbfc99..f546e45 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -68,6 +68,8 @@  #   define memalign(a, b)   malloc(b)  #endif +#define ZMQ_INPUT_MAX_FRAME_QUEUE 50 +  typedef std::complex<float> complexf; @@ -87,6 +89,7 @@ int main(int argc, char* argv[])      bool loop = false;      std::string inputName = "";      std::string inputTransport = "file"; +    unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE;      std::string outputName;      int useZeroMQOutput = 0; @@ -362,6 +365,9 @@ int main(int argc, char* argv[])          }          inputTransport = pt.get("input.transport", "file"); +        inputMaxFramesQueued = pt.get("input.max_frames_queued", +                ZMQ_INPUT_MAX_FRAME_QUEUE); +          inputName = pt.get("input.source", "/dev/stdin");          // log parameters: @@ -677,10 +683,10 @@ int main(int argc, char* argv[])  #else          // The URL might start with zmq+tcp://          if (inputName.substr(0, 4) == "zmq+") { -            inputZeroMQReader.Open(inputName.substr(4)); +            inputZeroMQReader.Open(inputName.substr(4), inputMaxFramesQueued);          }          else { -            inputZeroMQReader.Open(inputName); +            inputZeroMQReader.Open(inputName, inputMaxFramesQueued);          }          inputReader = &inputZeroMQReader;  #endif diff --git a/src/InputReader.h b/src/InputReader.h index 3e0dcab..3e3e000 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyrigth (C) 2013 +   Copyrigth (C) 2013, 2015     Matthias P. Braendli, matthias.braendli@mpb.li   */  /* @@ -137,6 +137,7 @@ struct InputZeroMQThreadData  {      ThreadsafeQueue<uint8_t*> *in_messages;      std::string uri; +    unsigned max_queued_frames;  };  class InputZeroMQWorker @@ -179,7 +180,7 @@ class InputZeroMQReader : public InputReader              worker_.Stop();          } -        int Open(std::string uri); +        int Open(const std::string& uri, unsigned max_queued_frames);          int GetNextFrame(void* buffer); @@ -197,3 +198,4 @@ class InputZeroMQReader : public InputReader  #endif  #endif + diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f7f5702..01d8720 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2013, 2014 +   Copyright (C) 2013, 2014, 2015     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -41,8 +41,6 @@  #include "InputReader.h"  #include "PcDebug.h" -#define MAX_QUEUE_SIZE 50 -  #define NUM_FRAMES_PER_ZMQ_MESSAGE 4  /* A concatenation of four ETI frames,   * whose maximal size is 6144. @@ -64,10 +62,11 @@ struct zmq_dab_message_t      uint8_t  buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];  }; -int InputZeroMQReader::Open(std::string uri) +int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames)  {      uri_ = uri;      workerdata_.uri = uri; +    workerdata_.max_queued_frames = max_queued_frames;      // launch receiver thread      worker_.Start(&workerdata_); @@ -123,7 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                  }                  m_to_drop--;              } -            else if (queue_size < MAX_QUEUE_SIZE) { +            else if (queue_size < workerdata->max_queued_frames) {                  if (buffer_full) {                      fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",                              queue_size);  | 
