summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/InputZeroMQReader.cpp18
1 files changed, 16 insertions, 2 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 763c77c..1418db7 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -134,11 +134,25 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
// zmq sockets are not thread safe. That's why
// we create it here, and not at object creation.
+ bool success = true;
+
try {
subscriber.connect(workerdata->uri.c_str());
+ }
+ catch (zmq::error_t& err) {
+ etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << workerdata->uri << "': '" << err.what() << "'";
+ success = false;
+ }
+ if (success) try {
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+ }
+ catch (zmq::error_t& err) {
+ etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'";
+ success = false;
+ }
+ if (success) try {
while (running)
{
zmq::message_t incoming;
@@ -173,7 +187,6 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
{
etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
dab_msg->buflen[i];
- // TODO error handling
}
else {
std::shared_ptr<std::vector<uint8_t> > buf =
@@ -218,9 +231,10 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
}
catch (zmq::error_t& err) {
- etiLog.level(error) << "ZeroMQ error in RecvProcess: '" << err.what() << "'";
+ etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'";
}
catch (std::exception& err) {
+ etiLog.level(error) << "Exception during receive: '" << err.what() << "'";
}
etiLog.level(info) << "ZeroMQ input worker terminated";