diff options
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 27 |
1 files changed, 23 insertions, 4 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 6b81ec2..4b55434 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -88,6 +88,12 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) { size_t queue_size = 0; + bool buffer_full = false; + + zmq::socket_t subscriber(zmqcontext, ZMQ_SUB); + // zmq sockets are not thread safe. That's why + // we create it here, and not at object creation. + try { subscriber.connect(workerdata->uri.c_str()); @@ -99,24 +105,37 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.recv(&incoming); if (queue_size < MAX_QUEUE_SIZE) { + if (buffer_full) { + fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", + queue_size); + buffer_full = false; + } + zmq::message_t* holder = new zmq::message_t(); holder->move(&incoming); // move the message into the holder queue_size = workerdata->in_messages->push(holder); } else { - workerdata->in_messages->notify(); - fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size); + if (!buffer_full) { + workerdata->in_messages->notify(); + fprintf(stderr, "ZeroMQ buffer overfull !\n"); + + buffer_full = true; + } } if (queue_size < 5) { - fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size); + fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n", + queue_size); } } } catch (zmq::error_t& err) { printf("ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + + subscriber.close(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) @@ -127,8 +146,8 @@ void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) void InputZeroMQWorker::Stop() { - subscriber.close(); running = false; + recv_thread.join(); } #endif |