From 0aec6da11b4add62ac473e3f4ea813bb4a8a556d Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 1 Feb 2023 14:03:03 +0100 Subject: Remove ZeroMQ input --- src/ConfigParser.cpp | 7 +- src/ConfigParser.h | 5 +- src/DabMod.cpp | 71 +--------- src/InputFileReader.cpp | 3 +- src/InputReader.h | 61 --------- src/InputZeroMQReader.cpp | 323 ---------------------------------------------- src/Utils.cpp | 1 - 7 files changed, 6 insertions(+), 465 deletions(-) delete mode 100644 src/InputZeroMQReader.cpp (limited to 'src') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 9190c60..3e223c3 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2023 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -113,8 +113,6 @@ static void parse_configfile( } mod_settings.inputTransport = pt.Get("input.transport", "file"); - mod_settings.inputMaxFramesQueued = pt.GetInteger("input.max_frames_queued", - ZMQ_INPUT_MAX_FRAME_QUEUE); mod_settings.edi_max_delay_ms = pt.GetReal("input.edi_max_delay", 0.0f); @@ -574,8 +572,7 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings) if (mod_settings.inputName.substr(0, 4) == "zmq+" && mod_settings.inputName.find("://") != std::string::npos) { - // if the name starts with zmq+XYZ://somewhere:port - mod_settings.inputTransport = "zeromq"; + throw std::runtime_error("Support for ZeroMQ input transport has been removed."); } else if (mod_settings.inputName.substr(0, 6) == "tcp://") { mod_settings.inputTransport = "tcp"; diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 8f2a1d2..8681175 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2023 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -40,8 +40,6 @@ #include "output/Lime.h" #include "output/BladeRF.h" -#define ZMQ_INPUT_MAX_FRAME_QUEUE 500 - struct mod_settings_t { std::string outputName; bool useZeroMQOutput = false; @@ -69,7 +67,6 @@ struct mod_settings_t { bool loop = false; std::string inputName = ""; std::string inputTransport = "file"; - unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE; float edi_max_delay_ms = 0.0f; tii_config_t tiiConfig; diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 45f4d0a..57e6e32 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -381,17 +381,6 @@ int launch_modulator(int argc, char* argv[]) inputReader = inputFileReader; } - else if (mod_settings.inputTransport == "zeromq") { -#if !defined(HAVE_ZEROMQ) - throw std::runtime_error("Unable to open input: " - "ZeroMQ input transport selected, but not compiled in!"); -#else - auto inputZeroMQReader = make_shared(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; -#endif - } else if (mod_settings.inputTransport == "tcp") { auto inputTcpReader = make_shared(); inputTcpReader->Open(mod_settings.inputName); @@ -460,17 +449,6 @@ int launch_modulator(int argc, char* argv[]) run_again = true; } } -#if defined(HAVE_ZEROMQ) - else if (auto in_zmq = dynamic_pointer_cast(inputReader)) { - run_again = true; - // Create a new input reader - rcs.remove_controllable(in_zmq.get()); - auto inputZeroMQReader = make_shared(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; - } -#endif else if (dynamic_pointer_cast(inputReader)) { // Keep the same inputReader, as there is no input buffer overflow run_again = true; @@ -500,14 +478,6 @@ int launch_modulator(int argc, char* argv[]) return ret; } -struct zmq_input_timeout : public std::exception -{ - const char* what() const throw() - { - return "InputZMQ timeout"; - } -}; - static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, modulator_data& m) { auto ret = run_modulator_state_t::failure; @@ -535,36 +505,9 @@ static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, m ret = run_modulator_state_t::normal_end; break; } -#if defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast(m.inputReader)) { - /* An empty frame marks a timeout. We ignore it, but we are - * now able to handle SIGINT properly. - * - * Also, we reconnect zmq every 10 seconds to avoid some - * issues, discussed in - * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection - * - * > It is possible that the PUB socket sees the error - * > while the SUB socket does not. - * > - * > The ZMTP RFC has a proposal for heartbeating that would - * > solve this problem. The current best solution is for - * > PUB sockets to send heartbeats (e.g. 1 per second) when - * > traffic is low, and for SUB sockets to disconnect / - * > reconnect if they stop getting these. - * - * We don't need a heartbeat, because our application is constant frame rate, - * the frames themselves can act as heartbeats. - */ - - const auto now = chrono::steady_clock::now(); - if (last_frame_received + chrono::seconds(10) < now) { - throw zmq_input_timeout(); - } - } -#endif // defined(HAVE_ZEROMQ) else if (dynamic_pointer_cast(m.inputReader)) { - /* Same as for ZeroMQ */ + /* An empty frame marks a timeout. We ignore it, but we are + * now able to handle SIGINT properly. */ } else { throw logic_error("Unhandled framesize==0!"); @@ -681,16 +624,6 @@ static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, m } } } - catch (const zmq_input_timeout&) { - // The ZeroMQ input timeout - etiLog.level(warn) << "Timeout"; - ret = run_modulator_state_t::again; - } - catch (const zmq_input_overflow& e) { - // The ZeroMQ input has overflowed its buffer - etiLog.level(warn) << e.what(); - ret = run_modulator_state_t::again; - } catch (const FrameMultiplexerError& e) { // The FrameMultiplexer saw an error or a change in the size of a // subchannel. This can be due to a multiplex reconfiguration. diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp index 5a9780b..a6b482e 100644 --- a/src/InputFileReader.cpp +++ b/src/InputFileReader.cpp @@ -6,8 +6,7 @@ Copyrigth (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li - - Input module for reading the ETI data from file or pipe, or ZeroMQ. + Input module for reading the ETI data from file or pipe. Supported file formats: RAW, FRAMED, STREAMED Supports re-sync to RAW ETI file diff --git a/src/InputReader.h b/src/InputReader.h index ab45d4f..2484948 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -38,11 +38,6 @@ #include #include #include -#if defined(HAVE_ZEROMQ) -# include "zmq.hpp" -# include "ThreadsafeQueue.h" -# include "RemoteControl.h" -#endif #include "Log.h" #include "Socket.h" #define INVALID_SOCKET -1 @@ -148,60 +143,4 @@ class InputTcpReader : public InputReader std::string m_uri; }; -struct zmq_input_overflow : public std::exception -{ - const char* what () const throw () - { - return "InputZMQ buffer overflow"; - } -}; - -#if defined(HAVE_ZEROMQ) -/* A ZeroMQ input. See www.zeromq.org for more info */ - -class InputZeroMQReader : public InputReader, public RemoteControllable -{ - public: - InputZeroMQReader(); - InputZeroMQReader(const InputZeroMQReader& other) = delete; - InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; - ~InputZeroMQReader(); - - int Open(const std::string& uri, size_t max_queued_frames); - virtual int GetNextFrame(void* buffer) override; - virtual std::string GetPrintableInfo() const override; - - /* Base function to set parameters. */ - virtual void set_parameter( - const std::string& parameter, - const std::string& value) override; - - /* Getting a parameter always returns a string. */ - virtual const std::string get_parameter( - const std::string& parameter) const override; - - private: - std::atomic m_running = ATOMIC_VAR_INIT(false); - std::string m_uri; - size_t m_max_queued_frames = 0; - - // Either must contain a full ETI frame, or one flag must be set - struct message_t { - std::vector eti_frame; - bool overflow = false; - bool timeout = false; - bool fault = false; - }; - ThreadsafeQueue m_in_messages; - - mutable std::mutex m_last_in_messages_size_mutex; - size_t m_last_in_messages_size = 0; - - void RecvProcess(void); - - zmq::context_t m_zmqcontext; // is thread-safe - std::thread m_recv_thread; -}; - -#endif diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp deleted file mode 100644 index 40a07d4..0000000 --- a/src/InputZeroMQReader.cpp +++ /dev/null @@ -1,323 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 - Her Majesty the Queen in Right of Canada (Communications Research - Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://opendigitalradio.org - */ -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see . - */ - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#if defined(HAVE_ZEROMQ) - -#include -#include -#include -#include -#include "zmq.hpp" -#include "InputReader.h" -#include "PcDebug.h" -#include "Utils.h" - -using namespace std; - -constexpr int ZMQ_TIMEOUT_MS = 100; - -#define NUM_FRAMES_PER_ZMQ_MESSAGE 4 -/* A concatenation of four ETI frames, - * whose maximal size is 6144. - * - * Four frames in one zmq message are sent, so that - * we do not risk breaking ETI vs. transmission frame - * phase. - * - * The header is followed by the four ETI frames. - */ -struct zmq_msg_header_t -{ - uint32_t version; - uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; -}; - -#define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ - (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) - -InputZeroMQReader::InputZeroMQReader() : - InputReader(), - RemoteControllable("inputzmq") -{ - RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)"); -} - -InputZeroMQReader::~InputZeroMQReader() -{ - m_running = false; - // This avoids the ugly "context was terminated" error because it lets - // poll do its thing first - this_thread::sleep_for(chrono::milliseconds(2 * ZMQ_TIMEOUT_MS)); - m_zmqcontext.close(); - if (m_recv_thread.joinable()) { - m_recv_thread.join(); - } -} - -int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) -{ - // The URL might start with zmq+tcp:// - if (uri.substr(0, 4) == "zmq+") { - m_uri = uri.substr(4); - } - else { - m_uri = uri; - } - - m_max_queued_frames = max_queued_frames; - - m_running = true; - m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this); - - return 0; -} - -int InputZeroMQReader::GetNextFrame(void* buffer) -{ - if (not m_running) { - throw runtime_error("ZMQ input is not ready yet"); - } - - message_t incoming; - - /* Do some prebuffering because reads will happen in bursts - * (4 ETI frames in TM1) and we should make sure that - * we can serve the data required for a full transmission frame. - */ - if (m_in_messages.size() < 4) { - const size_t prebuffering = 10; - etiLog.log(trace, "ZMQ,wait1"); - m_in_messages.wait_and_pop(incoming, prebuffering); - } - else { - etiLog.log(trace, "ZMQ,wait2"); - m_in_messages.wait_and_pop(incoming); - } - etiLog.log(trace, "ZMQ,pop"); - - constexpr size_t framesize = 6144; - - if (incoming.timeout) { - return 0; - } - else if (incoming.fault) { - throw runtime_error("ZMQ input has terminated"); - } - else if (incoming.overflow) { - throw zmq_input_overflow(); - } - else if (incoming.eti_frame.size() == framesize) { - unique_lock lock(m_last_in_messages_size_mutex); - m_last_in_messages_size--; - lock.unlock(); - - memcpy(buffer, &incoming.eti_frame.front(), framesize); - - return framesize; - } - else { - throw logic_error("ZMQ ETI not 6144"); - } -} - -std::string InputZeroMQReader::GetPrintableInfo() const -{ - return "Input ZeroMQ: Receiving from " + m_uri; -} - -void InputZeroMQReader::RecvProcess() -{ - set_thread_name("zmqinput"); - - size_t queue_size = 0; - - zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB); - // zmq sockets are not thread safe. That's why - // we create it here, and not at object creation. - - bool success = true; - - try { - subscriber.connect(m_uri.c_str()); - } - catch (const zmq::error_t& err) { - etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << - m_uri << "': '" << err.what() << "'"; - success = false; - } - - if (success) try { - // subscribe to all messages - subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); - } - catch (const zmq::error_t& err) { - etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << - err.what() << "'"; - success = false; - } - - if (success) try { - while (m_running) { - zmq::message_t incoming; - zmq::pollitem_t items[1]; - items[0].socket = subscriber; - items[0].events = ZMQ_POLLIN; - const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); - if (num_events == 0) { - message_t msg; - msg.timeout = true; - m_in_messages.push(move(msg)); - continue; - } - - subscriber.recv(incoming); - - if (queue_size < m_max_queued_frames) { - if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) { - throw runtime_error("ZeroMQ packet too small for header"); - } - else { - zmq_msg_header_t dab_msg; - memcpy(&dab_msg, incoming.data(), sizeof(zmq_msg_header_t)); - - if (dab_msg.version != 1) { - etiLog.level(error) << - "ZeroMQ wrong packet version " << - dab_msg.version; - } - - int offset = sizeof(dab_msg.version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg.buflen); - - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - if (dab_msg.buflen[i] > 6144) { - stringstream ss; - ss << "ZeroMQ buffer " << i << - " has invalid buflen " << dab_msg.buflen[i]; - throw runtime_error(ss.str()); - } - else { - vector buf(6144, 0x55); - - const int framesize = dab_msg.buflen[i]; - - if ((ssize_t)incoming.size() < offset + framesize) { - throw runtime_error("ZeroMQ packet too small"); - } - - memcpy(&buf.front(), - ((uint8_t*)incoming.data()) + offset, - framesize); - - offset += framesize; - - message_t msg; - msg.eti_frame = move(buf); - queue_size = m_in_messages.push(move(msg)); - etiLog.log(trace, "ZMQ,push %zu", queue_size); - - unique_lock lock(m_last_in_messages_size_mutex); - m_last_in_messages_size++; - } - } - } - } - else { - message_t msg; - msg.overflow = true; - queue_size = m_in_messages.push(move(msg)); - etiLog.level(warn) << "ZeroMQ buffer overfull !"; - throw runtime_error("ZMQ input full"); - } - - if (queue_size < 5) { - etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; - } - } - } - catch (const zmq::error_t& err) { - etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'"; - } - catch (const std::exception& err) { - etiLog.level(error) << "Exception during receive: '" << err.what() << "'"; - } - - m_running = false; - - etiLog.level(info) << "ZeroMQ input worker terminated"; - - subscriber.close(); - - message_t msg; - msg.fault = true; - queue_size = m_in_messages.push(move(msg)); -} - -// ======================================= -// Remote Control -// ======================================= -void InputZeroMQReader::set_parameter(const string& parameter, const string& value) -{ - stringstream ss(value); - ss.exceptions ( stringstream::failbit | stringstream::badbit ); - - if (parameter == "buffer") { - throw ParameterError("Parameter " + parameter + " is read-only."); - } - else { - stringstream ss_err; - ss_err << "Parameter '" << parameter - << "' is not exported by controllable " << get_rc_name(); - throw ParameterError(ss_err.str()); - } -} - -const string InputZeroMQReader::get_parameter(const string& parameter) const -{ - stringstream ss; - ss << std::fixed; - if (parameter == "buffer") { - // Do not use size of the queue, as it will contain empty - // frames to signal timeouts - unique_lock lock(m_last_in_messages_size_mutex); - const long time_in_buffer_us = 24000 * m_last_in_messages_size; - ss << time_in_buffer_us; - } - else { - ss << "Parameter '" << parameter << - "' is not exported by controllable " << get_rc_name(); - throw ParameterError(ss.str()); - } - return ss.str(); -} - -#endif - diff --git a/src/Utils.cpp b/src/Utils.cpp index 350838e..3f378a7 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -90,7 +90,6 @@ void printUsage(const char* progName) fprintf(out, "Where:\n"); fprintf(out, "input: ETI input filename (default: stdin), or\n"); fprintf(out, " tcp://source:port for ETI-over-TCP input, or\n"); - fprintf(out, " zmq+tcp://source:port for ZMQ input.\n"); fprintf(out, " udp://:port for EDI input.\n"); fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n"); fprintf(out, "-F format: Set the output format (see doc/example.ini for formats) for the file output.\n"); -- cgit v1.2.3