/* Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org */ /* This file is part of ODR-DabMod. ODR-DabMod is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ODR-DabMod is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ODR-DabMod. If not, see . */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #if defined(HAVE_ZEROMQ) #include #include #include #include #include "zmq.hpp" #include "InputReader.h" #include "PcDebug.h" #include "Utils.h" using namespace std; #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, * whose maximal size is 6144. * * Four frames in one zmq message are sent, so that * we do not risk breaking ETI vs. transmission frame * phase. * * The frames are concatenated in buf, and * their sizes is given in the buflen array. * * Most of the time, the buf will not be completely * filled */ struct zmq_dab_message_t { uint32_t version; uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; #define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) InputZeroMQReader::InputZeroMQReader() : InputReader(), RemoteControllable("inputzmq") { RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)"); } InputZeroMQReader::~InputZeroMQReader() { m_running = false; m_zmqcontext.close(); if (m_recv_thread.joinable()) { m_recv_thread.join(); } } int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) { // The URL might start with zmq+tcp:// if (uri.substr(0, 4) == "zmq+") { m_uri = uri.substr(4); } else { m_uri = uri; } m_max_queued_frames = max_queued_frames; m_running = true; m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this); return 0; } int InputZeroMQReader::GetNextFrame(void* buffer) { if (not m_running) { throw runtime_error("ZMQ input is not ready yet"); } message_t incoming; /* Do some prebuffering because reads will happen in bursts * (4 ETI frames in TM1) and we should make sure that * we can serve the data required for a full transmission frame. */ if (m_in_messages.size() < 4) { const size_t prebuffering = 10; etiLog.log(trace, "ZMQ,wait1"); m_in_messages.wait_and_pop(incoming, prebuffering); } else { etiLog.log(trace, "ZMQ,wait2"); m_in_messages.wait_and_pop(incoming); } etiLog.log(trace, "ZMQ,pop"); constexpr size_t framesize = 6144; if (incoming.timeout) { return 0; } else if (incoming.fault) { throw runtime_error("ZMQ input has terminated"); } else if (incoming.overflow) { throw zmq_input_overflow(); } else if (incoming.eti_frame.size() == framesize) { unique_lock lock(m_last_in_messages_size_mutex); m_last_in_messages_size--; lock.unlock(); memcpy(buffer, &incoming.eti_frame.front(), framesize); return framesize; } else { throw logic_error("ZMQ ETI not 6144"); } } std::string InputZeroMQReader::GetPrintableInfo() const { return "Input ZeroMQ: Receiving from " + m_uri; } void InputZeroMQReader::RecvProcess() { set_thread_name("zmqinput"); size_t queue_size = 0; zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB); // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. bool success = true; try { subscriber.connect(m_uri.c_str()); } catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << m_uri << "': '" << err.what() << "'"; success = false; } if (success) try { // subscribe to all messages subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'"; success = false; } if (success) try { while (m_running) { zmq::message_t incoming; zmq::pollitem_t items[1]; items[0].socket = subscriber; items[0].events = ZMQ_POLLIN; const int zmq_timeout_ms = 100; const int num_events = zmq::poll(items, 1, zmq_timeout_ms); if (num_events == 0) { message_t msg; msg.timeout = true; m_in_messages.push(move(msg)); continue; } subscriber.recv(&incoming); if (queue_size < m_max_queued_frames) { if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) { throw runtime_error("ZeroMQ packet too small for header"); } else { const zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); if (dab_msg->version != 1) { etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; } int offset = sizeof(dab_msg->version) + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { if (dab_msg->buflen[i] > 6144) { stringstream ss; ss << "ZeroMQ buffer " << i << " has invalid buflen " << dab_msg->buflen[i]; throw runtime_error(ss.str()); } else { vector buf(6144, 0x55); const int framesize = dab_msg->buflen[i]; if ((ssize_t)incoming.size() < offset + framesize) { throw runtime_error("ZeroMQ packet too small"); } memcpy(&buf.front(), ((uint8_t*)incoming.data()) + offset, framesize); offset += framesize; message_t msg; msg.eti_frame = move(buf); queue_size = m_in_messages.push(move(msg)); etiLog.log(trace, "ZMQ,push %zu", queue_size); unique_lock lock(m_last_in_messages_size_mutex); m_last_in_messages_size++; } } } } else { message_t msg; msg.overflow = true; queue_size = m_in_messages.push(move(msg)); etiLog.level(warn) << "ZeroMQ buffer overfull !"; throw runtime_error("ZMQ input full"); } if (queue_size < 5) { etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; } } } catch (const zmq::error_t& err) { etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'"; } catch (const std::exception& err) { etiLog.level(error) << "Exception during receive: '" << err.what() << "'"; } m_running = false; etiLog.level(info) << "ZeroMQ input worker terminated"; subscriber.close(); message_t msg; msg.fault = true; queue_size = m_in_messages.push(move(msg)); } // ======================================= // Remote Control // ======================================= void InputZeroMQReader::set_parameter(const string& parameter, const string& value) { stringstream ss(value); ss.exceptions ( stringstream::failbit | stringstream::badbit ); if (parameter == "buffer") { throw ParameterError("Parameter " + parameter + " is read-only."); } else { stringstream ss_err; ss_err << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); throw ParameterError(ss_err.str()); } } const string InputZeroMQReader::get_parameter(const string& parameter) const { stringstream ss; ss << std::fixed; if (parameter == "buffer") { // Do not use size of the queue, as it will contain empty // frames to signal timeouts unique_lock lock(m_last_in_messages_size_mutex); const long time_in_buffer_us = 24000 * m_last_in_messages_size; ss << time_in_buffer_us; } else { ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); throw ParameterError(ss.str()); } return ss.str(); } #endif