/* Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org */ /* This file is part of ODR-DabMod. ODR-DabMod is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ODR-DabMod is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ODR-DabMod. If not, see . */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #if defined(HAVE_ZEROMQ) #include #include #include #include #include "zmq.hpp" #include #include "porting.h" #include "InputReader.h" #include "PcDebug.h" #include "Utils.h" using namespace std; #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, * whose maximal size is 6144. * * Four frames in one zmq message are sent, so that * we do not risk breaking ETI vs. transmission frame * phase. * * The frames are concatenated in buf, and * their sizes is given in the buflen array. * * Most of the time, the buf will not be completely * filled */ struct zmq_dab_message_t { uint32_t version; uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; #define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) InputZeroMQReader::~InputZeroMQReader() { m_running = false; m_zmqcontext.close(); if (m_recv_thread.joinable()) { m_recv_thread.join(); } } int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) { // The URL might start with zmq+tcp:// if (uri.substr(0, 4) == "zmq+") { m_uri = uri.substr(4); } else { m_uri = uri; } m_max_queued_frames = max_queued_frames; m_recv_thread = boost::thread(&InputZeroMQReader::RecvProcess, this); return 0; } int InputZeroMQReader::GetNextFrame(void* buffer) { const size_t framesize = 6144; if (not m_running) { return 0; } shared_ptr > incoming; /* Do some prebuffering because reads will happen in bursts * (4 ETI frames in TM1) and we should make sure that * we can serve the data required for a full transmission frame. */ if (m_in_messages.size() < 4) { const size_t prebuffering = 10; etiLog.log(trace, "ZMQ,wait1"); m_in_messages.wait_and_pop(incoming, prebuffering); } else { etiLog.log(trace, "ZMQ,wait2"); m_in_messages.wait_and_pop(incoming); } etiLog.log(trace, "ZMQ,pop"); if (not m_running) { throw zmq_input_overflow(); } memcpy(buffer, &incoming->front(), framesize); return framesize; } void InputZeroMQReader::PrintInfo() const { fprintf(stderr, "Input ZeroMQ:\n"); fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str()); } void InputZeroMQReader::RecvProcess() { set_thread_name("zmqinput"); m_running = true; size_t queue_size = 0; bool buffer_full = false; zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB); // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. bool success = true; try { subscriber.connect(m_uri.c_str()); } catch (zmq::error_t& err) { etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << m_uri << "': '" << err.what() << "'"; success = false; } if (success) try { // subscribe to all messages subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } catch (zmq::error_t& err) { etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'"; success = false; } if (success) try { while (m_running) { zmq::message_t incoming; subscriber.recv(&incoming); if (m_to_drop) { queue_size = m_in_messages.size(); if (queue_size > 4) { m_in_messages.notify(); } m_to_drop--; } else if (queue_size < m_max_queued_frames) { if (buffer_full) { etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements"; buffer_full = false; } if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) { throw runtime_error("ZeroMQ packet too small for header"); } else { const zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); if (dab_msg->version != 1) { etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; } int offset = sizeof(dab_msg->version) + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { if (dab_msg->buflen[i] > 6144) { stringstream ss; ss << "ZeroMQ buffer " << i << " has invalid buflen " << dab_msg->buflen[i]; throw runtime_error(ss.str()); } else { auto buf = make_shared >(6144, 0x55); const int framesize = dab_msg->buflen[i]; if ((ssize_t)incoming.size() < offset + framesize) { throw runtime_error("ZeroMQ packet too small"); } memcpy(&buf->front(), ((uint8_t*)incoming.data()) + offset, framesize); offset += framesize; queue_size = m_in_messages.push(buf); etiLog.log(trace, "ZMQ,push %zu", queue_size); } } } } else { m_in_messages.notify(); if (!buffer_full) { etiLog.level(warn) << "ZeroMQ buffer overfull !"; buffer_full = true; throw runtime_error("ZMQ input full"); } queue_size = m_in_messages.size(); /* Drop three more incoming ETI frames before * we start accepting them again, to guarantee * that we keep transmission frame vs. ETI frame * phase. */ m_to_drop = 3; } if (queue_size < 5) { etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; } } } catch (zmq::error_t& err) { etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'"; } catch (std::exception& err) { etiLog.level(error) << "Exception during receive: '" << err.what() << "'"; } etiLog.level(info) << "ZeroMQ input worker terminated"; subscriber.close(); m_running = false; m_in_messages.notify(); } #endif