summaryrefslogtreecommitdiffstats
path: root/src/InputZeroMQReader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r--src/InputZeroMQReader.cpp33
1 files changed, 25 insertions, 8 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index cfb56b2..f8c15c4 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2013, 2014
+ Copyright (C) 2013, 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -29,7 +29,7 @@
# include "config.h"
#endif
-#if defined(HAVE_INPUT_ZEROMQ)
+#if defined(HAVE_ZEROMQ)
#include <string>
#include <cstring>
@@ -41,8 +41,6 @@
#include "InputReader.h"
#include "PcDebug.h"
-#define MAX_QUEUE_SIZE 50
-
#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
/* A concatenation of four ETI frames,
* whose maximal size is 6144.
@@ -64,10 +62,18 @@ struct zmq_dab_message_t
uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
};
-int InputZeroMQReader::Open(std::string uri)
+int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames)
{
- uri_ = uri;
+ // The URL might start with zmq+tcp://
+ if (uri.substr(0, 4) == "zmq+") {
+ uri_ = uri.substr(4);
+ }
+ else {
+ uri_ = uri;
+ }
+
workerdata_.uri = uri;
+ workerdata_.max_queued_frames = max_queued_frames;
// launch receiver thread
worker_.Start(&workerdata_);
@@ -81,6 +87,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
uint8_t* incoming;
in_messages_.wait_and_pop(incoming);
+ if (! workerdata_.running) {
+ throw zmq_input_overflow();
+ }
+
memcpy(buffer, incoming, framesize);
delete incoming;
@@ -123,7 +133,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
m_to_drop--;
}
- else if (queue_size < MAX_QUEUE_SIZE) {
+ else if (queue_size < workerdata->max_queued_frames) {
if (buffer_full) {
fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
queue_size);
@@ -175,6 +185,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
fprintf(stderr, "ZeroMQ buffer overfull !\n");
buffer_full = true;
+ throw std::runtime_error("ZMQ input full");
}
queue_size = workerdata->in_messages->size();
@@ -188,7 +199,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
if (queue_size < 5) {
- fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n",
+ fprintf(stderr, "ZeroMQ buffer low: %zu elements !\n",
queue_size);
}
}
@@ -196,15 +207,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
catch (zmq::error_t& err) {
fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
}
+ catch (std::exception& err) {
+ }
fprintf(stderr, "ZeroMQ input worker terminated\n");
subscriber.close();
+
+ workerdata->running = false;
+ workerdata->in_messages->notify();
}
void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
{
running = true;
+ workerdata->running = true;
recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
}