summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/InputReader.h10
-rw-r--r--src/InputZeroMQReader.cpp93
2 files changed, 46 insertions, 57 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index 98eab2b..63451e5 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -184,7 +184,15 @@ class InputZeroMQReader : public InputReader, public RemoteControllable
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_uri;
size_t m_max_queued_frames = 0;
- ThreadsafeQueue<std::vector<uint8_t> > m_in_messages;
+
+ // Either must contain a full ETI frame, or one flag must be set
+ struct message_t {
+ std::vector<uint8_t> eti_frame;
+ bool overflow = false;
+ bool timeout = false;
+ bool fault = false;
+ };
+ ThreadsafeQueue<message_t> m_in_messages;
mutable std::mutex m_last_in_messages_size_mutex;
size_t m_last_in_messages_size = 0;
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index f6c3c34..3661748 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -78,7 +78,7 @@ InputZeroMQReader::~InputZeroMQReader()
m_running = false;
m_zmqcontext.close();
if (m_recv_thread.joinable()) {
- m_recv_thread.join();
+ m_recv_thread.join();
}
}
@@ -94,6 +94,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
m_max_queued_frames = max_queued_frames;
+ m_running = true;
m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this);
return 0;
@@ -102,10 +103,10 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
int InputZeroMQReader::GetNextFrame(void* buffer)
{
if (not m_running) {
- return 0;
+ throw runtime_error("ZMQ input is not ready yet");
}
- vector<uint8_t> incoming;
+ message_t incoming;
/* Do some prebuffering because reads will happen in bursts
* (4 ETI frames in TM1) and we should make sure that
@@ -122,27 +123,29 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
}
etiLog.log(trace, "ZMQ,pop");
- if (not m_running) {
- throw zmq_input_overflow();
- }
-
+ constexpr size_t framesize = 6144;
- const size_t framesize = 6144;
- if (incoming.empty()) {
+ if (incoming.timeout) {
return 0;
}
- else if (incoming.size() == framesize) {
+ else if (incoming.fault) {
+ throw runtime_error("ZMQ input has terminated");
+ }
+ else if (incoming.overflow) {
+ throw zmq_input_overflow();
+ }
+ else if (incoming.eti_frame.size() == framesize) {
unique_lock<mutex> lock(m_last_in_messages_size_mutex);
m_last_in_messages_size--;
lock.unlock();
- memcpy(buffer, &incoming.front(), framesize);
+ memcpy(buffer, &incoming.eti_frame.front(), framesize);
+
+ return framesize;
}
else {
throw logic_error("ZMQ ETI not 6144");
}
-
- return framesize;
}
std::string InputZeroMQReader::GetPrintableInfo() const
@@ -153,10 +156,8 @@ std::string InputZeroMQReader::GetPrintableInfo() const
void InputZeroMQReader::RecvProcess()
{
set_thread_name("zmqinput");
- m_running = true;
size_t queue_size = 0;
- bool buffer_full = false;
zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB);
// zmq sockets are not thread safe. That's why
@@ -167,7 +168,7 @@ void InputZeroMQReader::RecvProcess()
try {
subscriber.connect(m_uri.c_str());
}
- catch (zmq::error_t& err) {
+ catch (const zmq::error_t& err) {
etiLog.level(error) << "Failed to connect ZeroMQ socket to '" <<
m_uri << "': '" << err.what() << "'";
success = false;
@@ -177,7 +178,7 @@ void InputZeroMQReader::RecvProcess()
// subscribe to all messages
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
}
- catch (zmq::error_t& err) {
+ catch (const zmq::error_t& err) {
etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" <<
err.what() << "'";
success = false;
@@ -192,28 +193,15 @@ void InputZeroMQReader::RecvProcess()
const int zmq_timeout_ms = 100;
const int num_events = zmq::poll(items, 1, zmq_timeout_ms);
if (num_events == 0) {
- // timeout is signalled by an empty buffer
- vector<uint8_t> buf;
- m_in_messages.push(buf);
+ message_t msg;
+ msg.timeout = true;
+ m_in_messages.push(move(msg));
continue;
}
subscriber.recv(&incoming);
- if (m_to_drop) {
- queue_size = m_in_messages.size();
- if (queue_size > 4) {
- m_in_messages.notify();
- }
- m_to_drop--;
- }
- else if (queue_size < m_max_queued_frames) {
- if (buffer_full) {
- etiLog.level(info) << "ZeroMQ buffer recovered: " <<
- queue_size << " elements";
- buffer_full = false;
- }
-
+ if (queue_size < m_max_queued_frames) {
if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) {
throw runtime_error("ZeroMQ packet too small for header");
}
@@ -251,7 +239,9 @@ void InputZeroMQReader::RecvProcess()
offset += framesize;
- queue_size = m_in_messages.push(move(buf));
+ message_t msg;
+ msg.eti_frame = move(buf);
+ queue_size = m_in_messages.push(move(msg));
etiLog.log(trace, "ZMQ,push %zu", queue_size);
unique_lock<mutex> lock(m_last_in_messages_size_mutex);
@@ -261,23 +251,11 @@ void InputZeroMQReader::RecvProcess()
}
}
else {
- m_in_messages.notify();
-
- if (!buffer_full) {
- etiLog.level(warn) << "ZeroMQ buffer overfull !";
-
- buffer_full = true;
- throw runtime_error("ZMQ input full");
- }
-
- queue_size = m_in_messages.size();
-
- /* Drop three more incoming ETI frames before
- * we start accepting them again, to guarantee
- * that we keep transmission frame vs. ETI frame
- * phase.
- */
- m_to_drop = 3;
+ message_t msg;
+ msg.overflow = true;
+ queue_size = m_in_messages.push(move(msg));
+ etiLog.level(warn) << "ZeroMQ buffer overfull !";
+ throw runtime_error("ZMQ input full");
}
if (queue_size < 5) {
@@ -285,19 +263,22 @@ void InputZeroMQReader::RecvProcess()
}
}
}
- catch (zmq::error_t& err) {
+ catch (const zmq::error_t& err) {
etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'";
}
- catch (std::exception& err) {
+ catch (const std::exception& err) {
etiLog.level(error) << "Exception during receive: '" << err.what() << "'";
}
+ m_running = false;
+
etiLog.level(info) << "ZeroMQ input worker terminated";
subscriber.close();
- m_running = false;
- m_in_messages.notify();
+ message_t msg;
+ msg.fault = true;
+ queue_size = m_in_messages.push(move(msg));
}
// =======================================