summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-02-21 10:49:42 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-02-21 10:49:42 +0100
commite8c274301aca0bca1b7d1e6086cf29af4645732e (patch)
tree44a000d7fcafb4c495268e3003e8acd8c9ad0926 /src
parenta0eda7372e525c3b7d292c042155be19c34856f8 (diff)
downloaddabmod-e8c274301aca0bca1b7d1e6086cf29af4645732e.tar.gz
dabmod-e8c274301aca0bca1b7d1e6086cf29af4645732e.tar.bz2
dabmod-e8c274301aca0bca1b7d1e6086cf29af4645732e.zip
zmq input improvements
Diffstat (limited to 'src')
-rw-r--r--src/InputReader.h5
-rw-r--r--src/InputZeroMQReader.cpp27
2 files changed, 25 insertions, 7 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index ae36899..de8fb78 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -143,15 +143,14 @@ class InputZeroMQWorker
{
public:
InputZeroMQWorker() :
- zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {}
+ zmqcontext(1) {}
void Start(struct InputZeroMQThreadData* workerdata);
void Stop();
private:
void RecvProcess(struct InputZeroMQThreadData* workerdata);
bool running;
- zmq::context_t zmqcontext;
- zmq::socket_t subscriber;
+ zmq::context_t zmqcontext; // is thread-safe
boost::thread recv_thread;
};
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 6b81ec2..4b55434 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -88,6 +88,12 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
{
size_t queue_size = 0;
+ bool buffer_full = false;
+
+ zmq::socket_t subscriber(zmqcontext, ZMQ_SUB);
+ // zmq sockets are not thread safe. That's why
+ // we create it here, and not at object creation.
+
try {
subscriber.connect(workerdata->uri.c_str());
@@ -99,24 +105,37 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
subscriber.recv(&incoming);
if (queue_size < MAX_QUEUE_SIZE) {
+ if (buffer_full) {
+ fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
+ queue_size);
+ buffer_full = false;
+ }
+
zmq::message_t* holder = new zmq::message_t();
holder->move(&incoming); // move the message into the holder
queue_size = workerdata->in_messages->push(holder);
}
else
{
- workerdata->in_messages->notify();
- fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size);
+ if (!buffer_full) {
+ workerdata->in_messages->notify();
+ fprintf(stderr, "ZeroMQ buffer overfull !\n");
+
+ buffer_full = true;
+ }
}
if (queue_size < 5) {
- fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size);
+ fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n",
+ queue_size);
}
}
}
catch (zmq::error_t& err) {
printf("ZeroMQ error in RecvProcess: '%s'\n", err.what());
}
+
+ subscriber.close();
}
void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
@@ -127,8 +146,8 @@ void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
void InputZeroMQWorker::Stop()
{
- subscriber.close();
running = false;
+ recv_thread.join();
}
#endif