diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-02-27 10:30:57 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-02-27 10:30:57 +0100 |
commit | 6db4e5b3ad28973601b8e7426a4cffa01322b9a2 (patch) | |
tree | 20ad9a5dc6a18dd513f4a254f7a86ddd2eba52f3 /src/InputZeroMQReader.cpp | |
parent | 201d711a1d3dfbe46d622871731005937598e790 (diff) | |
parent | c3dbbec39aef32789aacb872c88801f0c5d15ef7 (diff) | |
download | dabmod-6db4e5b3ad28973601b8e7426a4cffa01322b9a2.tar.gz dabmod-6db4e5b3ad28973601b8e7426a4cffa01322b9a2.tar.bz2 dabmod-6db4e5b3ad28973601b8e7426a4cffa01322b9a2.zip |
Merge branch 'next' into lime
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 93 |
1 files changed, 37 insertions, 56 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f6c3c34..3661748 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -78,7 +78,7 @@ InputZeroMQReader::~InputZeroMQReader() m_running = false; m_zmqcontext.close(); if (m_recv_thread.joinable()) { - m_recv_thread.join(); + m_recv_thread.join(); } } @@ -94,6 +94,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) m_max_queued_frames = max_queued_frames; + m_running = true; m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this); return 0; @@ -102,10 +103,10 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) int InputZeroMQReader::GetNextFrame(void* buffer) { if (not m_running) { - return 0; + throw runtime_error("ZMQ input is not ready yet"); } - vector<uint8_t> incoming; + message_t incoming; /* Do some prebuffering because reads will happen in bursts * (4 ETI frames in TM1) and we should make sure that @@ -122,27 +123,29 @@ int InputZeroMQReader::GetNextFrame(void* buffer) } etiLog.log(trace, "ZMQ,pop"); - if (not m_running) { - throw zmq_input_overflow(); - } - + constexpr size_t framesize = 6144; - const size_t framesize = 6144; - if (incoming.empty()) { + if (incoming.timeout) { return 0; } - else if (incoming.size() == framesize) { + else if (incoming.fault) { + throw runtime_error("ZMQ input has terminated"); + } + else if (incoming.overflow) { + throw zmq_input_overflow(); + } + else if (incoming.eti_frame.size() == framesize) { unique_lock<mutex> lock(m_last_in_messages_size_mutex); m_last_in_messages_size--; lock.unlock(); - memcpy(buffer, &incoming.front(), framesize); + memcpy(buffer, &incoming.eti_frame.front(), framesize); + + return framesize; } else { throw logic_error("ZMQ ETI not 6144"); } - - return framesize; } std::string InputZeroMQReader::GetPrintableInfo() const @@ -153,10 +156,8 @@ std::string InputZeroMQReader::GetPrintableInfo() const void InputZeroMQReader::RecvProcess() { set_thread_name("zmqinput"); - m_running = true; size_t queue_size = 0; - bool buffer_full = false; zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB); // zmq sockets are not thread safe. That's why @@ -167,7 +168,7 @@ void InputZeroMQReader::RecvProcess() try { subscriber.connect(m_uri.c_str()); } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << m_uri << "': '" << err.what() << "'"; success = false; @@ -177,7 +178,7 @@ void InputZeroMQReader::RecvProcess() // subscribe to all messages subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'"; success = false; @@ -192,28 +193,15 @@ void InputZeroMQReader::RecvProcess() const int zmq_timeout_ms = 100; const int num_events = zmq::poll(items, 1, zmq_timeout_ms); if (num_events == 0) { - // timeout is signalled by an empty buffer - vector<uint8_t> buf; - m_in_messages.push(buf); + message_t msg; + msg.timeout = true; + m_in_messages.push(move(msg)); continue; } subscriber.recv(&incoming); - if (m_to_drop) { - queue_size = m_in_messages.size(); - if (queue_size > 4) { - m_in_messages.notify(); - } - m_to_drop--; - } - else if (queue_size < m_max_queued_frames) { - if (buffer_full) { - etiLog.level(info) << "ZeroMQ buffer recovered: " << - queue_size << " elements"; - buffer_full = false; - } - + if (queue_size < m_max_queued_frames) { if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) { throw runtime_error("ZeroMQ packet too small for header"); } @@ -251,7 +239,9 @@ void InputZeroMQReader::RecvProcess() offset += framesize; - queue_size = m_in_messages.push(move(buf)); + message_t msg; + msg.eti_frame = move(buf); + queue_size = m_in_messages.push(move(msg)); etiLog.log(trace, "ZMQ,push %zu", queue_size); unique_lock<mutex> lock(m_last_in_messages_size_mutex); @@ -261,23 +251,11 @@ void InputZeroMQReader::RecvProcess() } } else { - m_in_messages.notify(); - - if (!buffer_full) { - etiLog.level(warn) << "ZeroMQ buffer overfull !"; - - buffer_full = true; - throw runtime_error("ZMQ input full"); - } - - queue_size = m_in_messages.size(); - - /* Drop three more incoming ETI frames before - * we start accepting them again, to guarantee - * that we keep transmission frame vs. ETI frame - * phase. - */ - m_to_drop = 3; + message_t msg; + msg.overflow = true; + queue_size = m_in_messages.push(move(msg)); + etiLog.level(warn) << "ZeroMQ buffer overfull !"; + throw runtime_error("ZMQ input full"); } if (queue_size < 5) { @@ -285,19 +263,22 @@ void InputZeroMQReader::RecvProcess() } } } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'"; } - catch (std::exception& err) { + catch (const std::exception& err) { etiLog.level(error) << "Exception during receive: '" << err.what() << "'"; } + m_running = false; + etiLog.level(info) << "ZeroMQ input worker terminated"; subscriber.close(); - m_running = false; - m_in_messages.notify(); + message_t msg; + msg.fault = true; + queue_size = m_in_messages.push(move(msg)); } // ======================================= |