From e8c274301aca0bca1b7d1e6086cf29af4645732e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 21 Feb 2014 10:49:42 +0100 Subject: zmq input improvements --- src/InputReader.h | 5 ++--- src/InputZeroMQReader.cpp | 27 +++++++++++++++++++++++---- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/InputReader.h b/src/InputReader.h index ae36899..de8fb78 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -143,15 +143,14 @@ class InputZeroMQWorker { public: InputZeroMQWorker() : - zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {} + zmqcontext(1) {} void Start(struct InputZeroMQThreadData* workerdata); void Stop(); private: void RecvProcess(struct InputZeroMQThreadData* workerdata); bool running; - zmq::context_t zmqcontext; - zmq::socket_t subscriber; + zmq::context_t zmqcontext; // is thread-safe boost::thread recv_thread; }; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 6b81ec2..4b55434 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -88,6 +88,12 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) { size_t queue_size = 0; + bool buffer_full = false; + + zmq::socket_t subscriber(zmqcontext, ZMQ_SUB); + // zmq sockets are not thread safe. That's why + // we create it here, and not at object creation. + try { subscriber.connect(workerdata->uri.c_str()); @@ -99,24 +105,37 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.recv(&incoming); if (queue_size < MAX_QUEUE_SIZE) { + if (buffer_full) { + fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", + queue_size); + buffer_full = false; + } + zmq::message_t* holder = new zmq::message_t(); holder->move(&incoming); // move the message into the holder queue_size = workerdata->in_messages->push(holder); } else { - workerdata->in_messages->notify(); - fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size); + if (!buffer_full) { + workerdata->in_messages->notify(); + fprintf(stderr, "ZeroMQ buffer overfull !\n"); + + buffer_full = true; + } } if (queue_size < 5) { - fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size); + fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n", + queue_size); } } } catch (zmq::error_t& err) { printf("ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + + subscriber.close(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) @@ -127,8 +146,8 @@ void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) void InputZeroMQWorker::Stop() { - subscriber.close(); running = false; + recv_thread.join(); } #endif -- cgit v1.2.3