aboutsummaryrefslogtreecommitdiffstats
path: root/src/InputZeroMQReader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r--src/InputZeroMQReader.cpp27
1 files changed, 23 insertions, 4 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 6b81ec2..4b55434 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -88,6 +88,12 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
{
size_t queue_size = 0;
+ bool buffer_full = false;
+
+ zmq::socket_t subscriber(zmqcontext, ZMQ_SUB);
+ // zmq sockets are not thread safe. That's why
+ // we create it here, and not at object creation.
+
try {
subscriber.connect(workerdata->uri.c_str());
@@ -99,24 +105,37 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
subscriber.recv(&incoming);
if (queue_size < MAX_QUEUE_SIZE) {
+ if (buffer_full) {
+ fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
+ queue_size);
+ buffer_full = false;
+ }
+
zmq::message_t* holder = new zmq::message_t();
holder->move(&incoming); // move the message into the holder
queue_size = workerdata->in_messages->push(holder);
}
else
{
- workerdata->in_messages->notify();
- fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size);
+ if (!buffer_full) {
+ workerdata->in_messages->notify();
+ fprintf(stderr, "ZeroMQ buffer overfull !\n");
+
+ buffer_full = true;
+ }
}
if (queue_size < 5) {
- fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size);
+ fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n",
+ queue_size);
}
}
}
catch (zmq::error_t& err) {
printf("ZeroMQ error in RecvProcess: '%s'\n", err.what());
}
+
+ subscriber.close();
}
void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
@@ -127,8 +146,8 @@ void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
void InputZeroMQWorker::Stop()
{
- subscriber.close();
running = false;
+ recv_thread.join();
}
#endif