diff options
| -rw-r--r-- | src/InputReader.h | 10 | ||||
| -rw-r--r-- | src/InputZeroMQReader.cpp | 93 | 
2 files changed, 46 insertions, 57 deletions
diff --git a/src/InputReader.h b/src/InputReader.h index 98eab2b..63451e5 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -184,7 +184,15 @@ class InputZeroMQReader : public InputReader, public RemoteControllable          std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);          std::string m_uri;          size_t m_max_queued_frames = 0; -        ThreadsafeQueue<std::vector<uint8_t> > m_in_messages; + +        // Either must contain a full ETI frame, or one flag must be set +        struct message_t { +            std::vector<uint8_t> eti_frame; +            bool overflow = false; +            bool timeout = false; +            bool fault = false; +        }; +        ThreadsafeQueue<message_t> m_in_messages;          mutable std::mutex m_last_in_messages_size_mutex;          size_t m_last_in_messages_size = 0; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f6c3c34..3661748 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -78,7 +78,7 @@ InputZeroMQReader::~InputZeroMQReader()      m_running = false;      m_zmqcontext.close();      if (m_recv_thread.joinable()) { -            m_recv_thread.join(); +        m_recv_thread.join();      }  } @@ -94,6 +94,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)      m_max_queued_frames = max_queued_frames; +    m_running = true;      m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this);      return 0; @@ -102,10 +103,10 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)  int InputZeroMQReader::GetNextFrame(void* buffer)  {      if (not m_running) { -        return 0; +        throw runtime_error("ZMQ input is not ready yet");      } -    vector<uint8_t> incoming; +    message_t incoming;      /* Do some prebuffering because reads will happen in bursts       * (4 ETI frames in TM1) and we should make sure that @@ -122,27 +123,29 @@ int InputZeroMQReader::GetNextFrame(void* buffer)      }      etiLog.log(trace, "ZMQ,pop"); -    if (not m_running) { -        throw zmq_input_overflow(); -    } - +    constexpr size_t framesize = 6144; -    const size_t framesize = 6144; -    if (incoming.empty()) { +    if (incoming.timeout) {          return 0;      } -    else if (incoming.size() == framesize) { +    else if (incoming.fault) { +        throw runtime_error("ZMQ input has terminated"); +    } +    else if (incoming.overflow) { +        throw zmq_input_overflow(); +    } +    else if (incoming.eti_frame.size() == framesize) {          unique_lock<mutex> lock(m_last_in_messages_size_mutex);          m_last_in_messages_size--;          lock.unlock(); -        memcpy(buffer, &incoming.front(), framesize); +        memcpy(buffer, &incoming.eti_frame.front(), framesize); + +        return framesize;      }      else {          throw logic_error("ZMQ ETI not 6144");      } - -    return framesize;  }  std::string InputZeroMQReader::GetPrintableInfo() const @@ -153,10 +156,8 @@ std::string InputZeroMQReader::GetPrintableInfo() const  void InputZeroMQReader::RecvProcess()  {      set_thread_name("zmqinput"); -    m_running = true;      size_t queue_size = 0; -    bool buffer_full = false;      zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB);      // zmq sockets are not thread safe. That's why @@ -167,7 +168,7 @@ void InputZeroMQReader::RecvProcess()      try {          subscriber.connect(m_uri.c_str());      } -    catch (zmq::error_t& err) { +    catch (const zmq::error_t& err) {          etiLog.level(error) << "Failed to connect ZeroMQ socket to '" <<              m_uri << "': '" << err.what() << "'";          success = false; @@ -177,7 +178,7 @@ void InputZeroMQReader::RecvProcess()          // subscribe to all messages          subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);      } -    catch (zmq::error_t& err) { +    catch (const zmq::error_t& err) {          etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" <<              err.what() << "'";          success = false; @@ -192,28 +193,15 @@ void InputZeroMQReader::RecvProcess()              const int zmq_timeout_ms = 100;              const int num_events = zmq::poll(items, 1, zmq_timeout_ms);              if (num_events == 0) { -                // timeout is signalled by an empty buffer -                vector<uint8_t> buf; -                m_in_messages.push(buf); +                message_t msg; +                msg.timeout = true; +                m_in_messages.push(move(msg));                  continue;              }              subscriber.recv(&incoming); -            if (m_to_drop) { -                queue_size = m_in_messages.size(); -                if (queue_size > 4) { -                    m_in_messages.notify(); -                } -                m_to_drop--; -            } -            else if (queue_size < m_max_queued_frames) { -                if (buffer_full) { -                    etiLog.level(info) << "ZeroMQ buffer recovered: " << -                        queue_size << " elements"; -                    buffer_full = false; -                } - +            if (queue_size < m_max_queued_frames) {                  if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) {                      throw runtime_error("ZeroMQ packet too small for header");                  } @@ -251,7 +239,9 @@ void InputZeroMQReader::RecvProcess()                              offset += framesize; -                            queue_size = m_in_messages.push(move(buf)); +                            message_t msg; +                            msg.eti_frame = move(buf); +                            queue_size = m_in_messages.push(move(msg));                              etiLog.log(trace, "ZMQ,push %zu", queue_size);                              unique_lock<mutex> lock(m_last_in_messages_size_mutex); @@ -261,23 +251,11 @@ void InputZeroMQReader::RecvProcess()                  }              }              else { -                m_in_messages.notify(); - -                if (!buffer_full) { -                    etiLog.level(warn) << "ZeroMQ buffer overfull !"; - -                    buffer_full = true; -                    throw runtime_error("ZMQ input full"); -                } - -                queue_size = m_in_messages.size(); - -                /* Drop three more incoming ETI frames before -                 * we start accepting them again, to guarantee -                 * that we keep transmission frame vs. ETI frame -                 * phase. -                 */ -                m_to_drop = 3; +                message_t msg; +                msg.overflow = true; +                queue_size = m_in_messages.push(move(msg)); +                etiLog.level(warn) << "ZeroMQ buffer overfull !"; +                throw runtime_error("ZMQ input full");              }              if (queue_size < 5) { @@ -285,19 +263,22 @@ void InputZeroMQReader::RecvProcess()              }          }      } -    catch (zmq::error_t& err) { +    catch (const zmq::error_t& err) {          etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'";      } -    catch (std::exception& err) { +    catch (const std::exception& err) {          etiLog.level(error) << "Exception during receive: '" << err.what() << "'";      } +    m_running = false; +      etiLog.level(info) << "ZeroMQ input worker terminated";      subscriber.close(); -    m_running = false; -    m_in_messages.notify(); +    message_t msg; +    msg.fault = true; +    queue_size = m_in_messages.push(move(msg));  }  // =======================================  | 
