From aec2a980fc37bd5d28f06c8e48a98b51ae65ac32 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 28 Jan 2019 17:13:28 +0100 Subject: Represent ZMQ input failures more clearly --- src/InputReader.h | 10 ++++- src/InputZeroMQReader.cpp | 93 +++++++++++++++++++---------------------------- 2 files changed, 46 insertions(+), 57 deletions(-) (limited to 'src') diff --git a/src/InputReader.h b/src/InputReader.h index 98eab2b..63451e5 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -184,7 +184,15 @@ class InputZeroMQReader : public InputReader, public RemoteControllable std::atomic m_running = ATOMIC_VAR_INIT(false); std::string m_uri; size_t m_max_queued_frames = 0; - ThreadsafeQueue > m_in_messages; + + // Either must contain a full ETI frame, or one flag must be set + struct message_t { + std::vector eti_frame; + bool overflow = false; + bool timeout = false; + bool fault = false; + }; + ThreadsafeQueue m_in_messages; mutable std::mutex m_last_in_messages_size_mutex; size_t m_last_in_messages_size = 0; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f6c3c34..3661748 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -78,7 +78,7 @@ InputZeroMQReader::~InputZeroMQReader() m_running = false; m_zmqcontext.close(); if (m_recv_thread.joinable()) { - m_recv_thread.join(); + m_recv_thread.join(); } } @@ -94,6 +94,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) m_max_queued_frames = max_queued_frames; + m_running = true; m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this); return 0; @@ -102,10 +103,10 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) int InputZeroMQReader::GetNextFrame(void* buffer) { if (not m_running) { - return 0; + throw runtime_error("ZMQ input is not ready yet"); } - vector incoming; + message_t incoming; /* Do some prebuffering because reads will happen in bursts * (4 ETI frames in TM1) and we should make sure that @@ -122,27 +123,29 @@ int InputZeroMQReader::GetNextFrame(void* buffer) } etiLog.log(trace, "ZMQ,pop"); - if (not m_running) { - throw zmq_input_overflow(); - } - + constexpr size_t framesize = 6144; - const size_t framesize = 6144; - if (incoming.empty()) { + if (incoming.timeout) { return 0; } - else if (incoming.size() == framesize) { + else if (incoming.fault) { + throw runtime_error("ZMQ input has terminated"); + } + else if (incoming.overflow) { + throw zmq_input_overflow(); + } + else if (incoming.eti_frame.size() == framesize) { unique_lock lock(m_last_in_messages_size_mutex); m_last_in_messages_size--; lock.unlock(); - memcpy(buffer, &incoming.front(), framesize); + memcpy(buffer, &incoming.eti_frame.front(), framesize); + + return framesize; } else { throw logic_error("ZMQ ETI not 6144"); } - - return framesize; } std::string InputZeroMQReader::GetPrintableInfo() const @@ -153,10 +156,8 @@ std::string InputZeroMQReader::GetPrintableInfo() const void InputZeroMQReader::RecvProcess() { set_thread_name("zmqinput"); - m_running = true; size_t queue_size = 0; - bool buffer_full = false; zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB); // zmq sockets are not thread safe. That's why @@ -167,7 +168,7 @@ void InputZeroMQReader::RecvProcess() try { subscriber.connect(m_uri.c_str()); } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << m_uri << "': '" << err.what() << "'"; success = false; @@ -177,7 +178,7 @@ void InputZeroMQReader::RecvProcess() // subscribe to all messages subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'"; success = false; @@ -192,28 +193,15 @@ void InputZeroMQReader::RecvProcess() const int zmq_timeout_ms = 100; const int num_events = zmq::poll(items, 1, zmq_timeout_ms); if (num_events == 0) { - // timeout is signalled by an empty buffer - vector buf; - m_in_messages.push(buf); + message_t msg; + msg.timeout = true; + m_in_messages.push(move(msg)); continue; } subscriber.recv(&incoming); - if (m_to_drop) { - queue_size = m_in_messages.size(); - if (queue_size > 4) { - m_in_messages.notify(); - } - m_to_drop--; - } - else if (queue_size < m_max_queued_frames) { - if (buffer_full) { - etiLog.level(info) << "ZeroMQ buffer recovered: " << - queue_size << " elements"; - buffer_full = false; - } - + if (queue_size < m_max_queued_frames) { if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) { throw runtime_error("ZeroMQ packet too small for header"); } @@ -251,7 +239,9 @@ void InputZeroMQReader::RecvProcess() offset += framesize; - queue_size = m_in_messages.push(move(buf)); + message_t msg; + msg.eti_frame = move(buf); + queue_size = m_in_messages.push(move(msg)); etiLog.log(trace, "ZMQ,push %zu", queue_size); unique_lock lock(m_last_in_messages_size_mutex); @@ -261,23 +251,11 @@ void InputZeroMQReader::RecvProcess() } } else { - m_in_messages.notify(); - - if (!buffer_full) { - etiLog.level(warn) << "ZeroMQ buffer overfull !"; - - buffer_full = true; - throw runtime_error("ZMQ input full"); - } - - queue_size = m_in_messages.size(); - - /* Drop three more incoming ETI frames before - * we start accepting them again, to guarantee - * that we keep transmission frame vs. ETI frame - * phase. - */ - m_to_drop = 3; + message_t msg; + msg.overflow = true; + queue_size = m_in_messages.push(move(msg)); + etiLog.level(warn) << "ZeroMQ buffer overfull !"; + throw runtime_error("ZMQ input full"); } if (queue_size < 5) { @@ -285,19 +263,22 @@ void InputZeroMQReader::RecvProcess() } } } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'"; } - catch (std::exception& err) { + catch (const std::exception& err) { etiLog.level(error) << "Exception during receive: '" << err.what() << "'"; } + m_running = false; + etiLog.level(info) << "ZeroMQ input worker terminated"; subscriber.close(); - m_running = false; - m_in_messages.notify(); + message_t msg; + msg.fault = true; + queue_size = m_in_messages.push(move(msg)); } // ======================================= -- cgit v1.2.3