diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-02-21 10:49:42 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-02-21 10:49:42 +0100 |
commit | e8c274301aca0bca1b7d1e6086cf29af4645732e (patch) | |
tree | 44a000d7fcafb4c495268e3003e8acd8c9ad0926 /src | |
parent | a0eda7372e525c3b7d292c042155be19c34856f8 (diff) | |
download | dabmod-e8c274301aca0bca1b7d1e6086cf29af4645732e.tar.gz dabmod-e8c274301aca0bca1b7d1e6086cf29af4645732e.tar.bz2 dabmod-e8c274301aca0bca1b7d1e6086cf29af4645732e.zip |
zmq input improvements
Diffstat (limited to 'src')
-rw-r--r-- | src/InputReader.h | 5 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 27 |
2 files changed, 25 insertions, 7 deletions
diff --git a/src/InputReader.h b/src/InputReader.h index ae36899..de8fb78 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -143,15 +143,14 @@ class InputZeroMQWorker { public: InputZeroMQWorker() : - zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {} + zmqcontext(1) {} void Start(struct InputZeroMQThreadData* workerdata); void Stop(); private: void RecvProcess(struct InputZeroMQThreadData* workerdata); bool running; - zmq::context_t zmqcontext; - zmq::socket_t subscriber; + zmq::context_t zmqcontext; // is thread-safe boost::thread recv_thread; }; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 6b81ec2..4b55434 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -88,6 +88,12 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) { size_t queue_size = 0; + bool buffer_full = false; + + zmq::socket_t subscriber(zmqcontext, ZMQ_SUB); + // zmq sockets are not thread safe. That's why + // we create it here, and not at object creation. + try { subscriber.connect(workerdata->uri.c_str()); @@ -99,24 +105,37 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.recv(&incoming); if (queue_size < MAX_QUEUE_SIZE) { + if (buffer_full) { + fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", + queue_size); + buffer_full = false; + } + zmq::message_t* holder = new zmq::message_t(); holder->move(&incoming); // move the message into the holder queue_size = workerdata->in_messages->push(holder); } else { - workerdata->in_messages->notify(); - fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size); + if (!buffer_full) { + workerdata->in_messages->notify(); + fprintf(stderr, "ZeroMQ buffer overfull !\n"); + + buffer_full = true; + } } if (queue_size < 5) { - fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size); + fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n", + queue_size); } } } catch (zmq::error_t& err) { printf("ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + + subscriber.close(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) @@ -127,8 +146,8 @@ void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) void InputZeroMQWorker::Stop() { - subscriber.close(); running = false; + recv_thread.join(); } #endif |