aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2023-02-01 14:03:03 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2023-02-01 14:03:03 +0100
commit0aec6da11b4add62ac473e3f4ea813bb4a8a556d (patch)
tree720c3903f86896895d303240181dd85799ae78c1 /src
parentbf6e05a427e050ec54b9da91da8ac04f52fa006c (diff)
downloaddabmod-0aec6da11b4add62ac473e3f4ea813bb4a8a556d.tar.gz
dabmod-0aec6da11b4add62ac473e3f4ea813bb4a8a556d.tar.bz2
dabmod-0aec6da11b4add62ac473e3f4ea813bb4a8a556d.zip
Remove ZeroMQ input
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp7
-rw-r--r--src/ConfigParser.h5
-rw-r--r--src/DabMod.cpp71
-rw-r--r--src/InputFileReader.cpp3
-rw-r--r--src/InputReader.h61
-rw-r--r--src/InputZeroMQReader.cpp323
-rw-r--r--src/Utils.cpp1
7 files changed, 6 insertions, 465 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 9190c60..3e223c3 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -113,8 +113,6 @@ static void parse_configfile(
}
mod_settings.inputTransport = pt.Get("input.transport", "file");
- mod_settings.inputMaxFramesQueued = pt.GetInteger("input.max_frames_queued",
- ZMQ_INPUT_MAX_FRAME_QUEUE);
mod_settings.edi_max_delay_ms = pt.GetReal("input.edi_max_delay", 0.0f);
@@ -574,8 +572,7 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)
if (mod_settings.inputName.substr(0, 4) == "zmq+" &&
mod_settings.inputName.find("://") != std::string::npos) {
- // if the name starts with zmq+XYZ://somewhere:port
- mod_settings.inputTransport = "zeromq";
+ throw std::runtime_error("Support for ZeroMQ input transport has been removed.");
}
else if (mod_settings.inputName.substr(0, 6) == "tcp://") {
mod_settings.inputTransport = "tcp";
diff --git a/src/ConfigParser.h b/src/ConfigParser.h
index 8f2a1d2..8681175 100644
--- a/src/ConfigParser.h
+++ b/src/ConfigParser.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -40,8 +40,6 @@
#include "output/Lime.h"
#include "output/BladeRF.h"
-#define ZMQ_INPUT_MAX_FRAME_QUEUE 500
-
struct mod_settings_t {
std::string outputName;
bool useZeroMQOutput = false;
@@ -69,7 +67,6 @@ struct mod_settings_t {
bool loop = false;
std::string inputName = "";
std::string inputTransport = "file";
- unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE;
float edi_max_delay_ms = 0.0f;
tii_config_t tiiConfig;
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 45f4d0a..57e6e32 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -381,17 +381,6 @@ int launch_modulator(int argc, char* argv[])
inputReader = inputFileReader;
}
- else if (mod_settings.inputTransport == "zeromq") {
-#if !defined(HAVE_ZEROMQ)
- throw std::runtime_error("Unable to open input: "
- "ZeroMQ input transport selected, but not compiled in!");
-#else
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
-#endif
- }
else if (mod_settings.inputTransport == "tcp") {
auto inputTcpReader = make_shared<InputTcpReader>();
inputTcpReader->Open(mod_settings.inputName);
@@ -460,17 +449,6 @@ int launch_modulator(int argc, char* argv[])
run_again = true;
}
}
-#if defined(HAVE_ZEROMQ)
- else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
- run_again = true;
- // Create a new input reader
- rcs.remove_controllable(in_zmq.get());
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
- }
-#endif
else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) {
// Keep the same inputReader, as there is no input buffer overflow
run_again = true;
@@ -500,14 +478,6 @@ int launch_modulator(int argc, char* argv[])
return ret;
}
-struct zmq_input_timeout : public std::exception
-{
- const char* what() const throw()
- {
- return "InputZMQ timeout";
- }
-};
-
static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, modulator_data& m)
{
auto ret = run_modulator_state_t::failure;
@@ -535,36 +505,9 @@ static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, m
ret = run_modulator_state_t::normal_end;
break;
}
-#if defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
- /* An empty frame marks a timeout. We ignore it, but we are
- * now able to handle SIGINT properly.
- *
- * Also, we reconnect zmq every 10 seconds to avoid some
- * issues, discussed in
- * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
- *
- * > It is possible that the PUB socket sees the error
- * > while the SUB socket does not.
- * >
- * > The ZMTP RFC has a proposal for heartbeating that would
- * > solve this problem. The current best solution is for
- * > PUB sockets to send heartbeats (e.g. 1 per second) when
- * > traffic is low, and for SUB sockets to disconnect /
- * > reconnect if they stop getting these.
- *
- * We don't need a heartbeat, because our application is constant frame rate,
- * the frames themselves can act as heartbeats.
- */
-
- const auto now = chrono::steady_clock::now();
- if (last_frame_received + chrono::seconds(10) < now) {
- throw zmq_input_timeout();
- }
- }
-#endif // defined(HAVE_ZEROMQ)
else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
- /* Same as for ZeroMQ */
+ /* An empty frame marks a timeout. We ignore it, but we are
+ * now able to handle SIGINT properly. */
}
else {
throw logic_error("Unhandled framesize==0!");
@@ -681,16 +624,6 @@ static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, m
}
}
}
- catch (const zmq_input_timeout&) {
- // The ZeroMQ input timeout
- etiLog.level(warn) << "Timeout";
- ret = run_modulator_state_t::again;
- }
- catch (const zmq_input_overflow& e) {
- // The ZeroMQ input has overflowed its buffer
- etiLog.level(warn) << e.what();
- ret = run_modulator_state_t::again;
- }
catch (const FrameMultiplexerError& e) {
// The FrameMultiplexer saw an error or a change in the size of a
// subchannel. This can be due to a multiplex reconfiguration.
diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp
index 5a9780b..a6b482e 100644
--- a/src/InputFileReader.cpp
+++ b/src/InputFileReader.cpp
@@ -6,8 +6,7 @@
Copyrigth (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
-
- Input module for reading the ETI data from file or pipe, or ZeroMQ.
+ Input module for reading the ETI data from file or pipe.
Supported file formats: RAW, FRAMED, STREAMED
Supports re-sync to RAW ETI file
diff --git a/src/InputReader.h b/src/InputReader.h
index ab45d4f..2484948 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -38,11 +38,6 @@
#include <memory>
#include <thread>
#include <unistd.h>
-#if defined(HAVE_ZEROMQ)
-# include "zmq.hpp"
-# include "ThreadsafeQueue.h"
-# include "RemoteControl.h"
-#endif
#include "Log.h"
#include "Socket.h"
#define INVALID_SOCKET -1
@@ -148,60 +143,4 @@ class InputTcpReader : public InputReader
std::string m_uri;
};
-struct zmq_input_overflow : public std::exception
-{
- const char* what () const throw ()
- {
- return "InputZMQ buffer overflow";
- }
-};
-
-#if defined(HAVE_ZEROMQ)
-/* A ZeroMQ input. See www.zeromq.org for more info */
-
-class InputZeroMQReader : public InputReader, public RemoteControllable
-{
- public:
- InputZeroMQReader();
- InputZeroMQReader(const InputZeroMQReader& other) = delete;
- InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
- ~InputZeroMQReader();
-
- int Open(const std::string& uri, size_t max_queued_frames);
- virtual int GetNextFrame(void* buffer) override;
- virtual std::string GetPrintableInfo() const override;
-
- /* Base function to set parameters. */
- virtual void set_parameter(
- const std::string& parameter,
- const std::string& value) override;
-
- /* Getting a parameter always returns a string. */
- virtual const std::string get_parameter(
- const std::string& parameter) const override;
-
- private:
- std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
- std::string m_uri;
- size_t m_max_queued_frames = 0;
-
- // Either must contain a full ETI frame, or one flag must be set
- struct message_t {
- std::vector<uint8_t> eti_frame;
- bool overflow = false;
- bool timeout = false;
- bool fault = false;
- };
- ThreadsafeQueue<message_t> m_in_messages;
-
- mutable std::mutex m_last_in_messages_size_mutex;
- size_t m_last_in_messages_size = 0;
-
- void RecvProcess(void);
-
- zmq::context_t m_zmqcontext; // is thread-safe
- std::thread m_recv_thread;
-};
-
-#endif
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
deleted file mode 100644
index 40a07d4..0000000
--- a/src/InputZeroMQReader.cpp
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#if defined(HAVE_ZEROMQ)
-
-#include <string>
-#include <cstring>
-#include <cstdio>
-#include <stdint.h>
-#include "zmq.hpp"
-#include "InputReader.h"
-#include "PcDebug.h"
-#include "Utils.h"
-
-using namespace std;
-
-constexpr int ZMQ_TIMEOUT_MS = 100;
-
-#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
-/* A concatenation of four ETI frames,
- * whose maximal size is 6144.
- *
- * Four frames in one zmq message are sent, so that
- * we do not risk breaking ETI vs. transmission frame
- * phase.
- *
- * The header is followed by the four ETI frames.
- */
-struct zmq_msg_header_t
-{
- uint32_t version;
- uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE];
-};
-
-#define ZMQ_DAB_MESSAGE_T_HEADERSIZE \
- (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t))
-
-InputZeroMQReader::InputZeroMQReader() :
- InputReader(),
- RemoteControllable("inputzmq")
-{
- RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)");
-}
-
-InputZeroMQReader::~InputZeroMQReader()
-{
- m_running = false;
- // This avoids the ugly "context was terminated" error because it lets
- // poll do its thing first
- this_thread::sleep_for(chrono::milliseconds(2 * ZMQ_TIMEOUT_MS));
- m_zmqcontext.close();
- if (m_recv_thread.joinable()) {
- m_recv_thread.join();
- }
-}
-
-int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
-{
- // The URL might start with zmq+tcp://
- if (uri.substr(0, 4) == "zmq+") {
- m_uri = uri.substr(4);
- }
- else {
- m_uri = uri;
- }
-
- m_max_queued_frames = max_queued_frames;
-
- m_running = true;
- m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this);
-
- return 0;
-}
-
-int InputZeroMQReader::GetNextFrame(void* buffer)
-{
- if (not m_running) {
- throw runtime_error("ZMQ input is not ready yet");
- }
-
- message_t incoming;
-
- /* Do some prebuffering because reads will happen in bursts
- * (4 ETI frames in TM1) and we should make sure that
- * we can serve the data required for a full transmission frame.
- */
- if (m_in_messages.size() < 4) {
- const size_t prebuffering = 10;
- etiLog.log(trace, "ZMQ,wait1");
- m_in_messages.wait_and_pop(incoming, prebuffering);
- }
- else {
- etiLog.log(trace, "ZMQ,wait2");
- m_in_messages.wait_and_pop(incoming);
- }
- etiLog.log(trace, "ZMQ,pop");
-
- constexpr size_t framesize = 6144;
-
- if (incoming.timeout) {
- return 0;
- }
- else if (incoming.fault) {
- throw runtime_error("ZMQ input has terminated");
- }
- else if (incoming.overflow) {
- throw zmq_input_overflow();
- }
- else if (incoming.eti_frame.size() == framesize) {
- unique_lock<mutex> lock(m_last_in_messages_size_mutex);
- m_last_in_messages_size--;
- lock.unlock();
-
- memcpy(buffer, &incoming.eti_frame.front(), framesize);
-
- return framesize;
- }
- else {
- throw logic_error("ZMQ ETI not 6144");
- }
-}
-
-std::string InputZeroMQReader::GetPrintableInfo() const
-{
- return "Input ZeroMQ: Receiving from " + m_uri;
-}
-
-void InputZeroMQReader::RecvProcess()
-{
- set_thread_name("zmqinput");
-
- size_t queue_size = 0;
-
- zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB);
- // zmq sockets are not thread safe. That's why
- // we create it here, and not at object creation.
-
- bool success = true;
-
- try {
- subscriber.connect(m_uri.c_str());
- }
- catch (const zmq::error_t& err) {
- etiLog.level(error) << "Failed to connect ZeroMQ socket to '" <<
- m_uri << "': '" << err.what() << "'";
- success = false;
- }
-
- if (success) try {
- // subscribe to all messages
- subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
- }
- catch (const zmq::error_t& err) {
- etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" <<
- err.what() << "'";
- success = false;
- }
-
- if (success) try {
- while (m_running) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = subscriber;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) {
- message_t msg;
- msg.timeout = true;
- m_in_messages.push(move(msg));
- continue;
- }
-
- subscriber.recv(incoming);
-
- if (queue_size < m_max_queued_frames) {
- if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) {
- throw runtime_error("ZeroMQ packet too small for header");
- }
- else {
- zmq_msg_header_t dab_msg;
- memcpy(&dab_msg, incoming.data(), sizeof(zmq_msg_header_t));
-
- if (dab_msg.version != 1) {
- etiLog.level(error) <<
- "ZeroMQ wrong packet version " <<
- dab_msg.version;
- }
-
- int offset = sizeof(dab_msg.version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg.buflen);
-
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg.buflen[i] > 6144) {
- stringstream ss;
- ss << "ZeroMQ buffer " << i <<
- " has invalid buflen " << dab_msg.buflen[i];
- throw runtime_error(ss.str());
- }
- else {
- vector<uint8_t> buf(6144, 0x55);
-
- const int framesize = dab_msg.buflen[i];
-
- if ((ssize_t)incoming.size() < offset + framesize) {
- throw runtime_error("ZeroMQ packet too small");
- }
-
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
-
- offset += framesize;
-
- message_t msg;
- msg.eti_frame = move(buf);
- queue_size = m_in_messages.push(move(msg));
- etiLog.log(trace, "ZMQ,push %zu", queue_size);
-
- unique_lock<mutex> lock(m_last_in_messages_size_mutex);
- m_last_in_messages_size++;
- }
- }
- }
- }
- else {
- message_t msg;
- msg.overflow = true;
- queue_size = m_in_messages.push(move(msg));
- etiLog.level(warn) << "ZeroMQ buffer overfull !";
- throw runtime_error("ZMQ input full");
- }
-
- if (queue_size < 5) {
- etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !";
- }
- }
- }
- catch (const zmq::error_t& err) {
- etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'";
- }
- catch (const std::exception& err) {
- etiLog.level(error) << "Exception during receive: '" << err.what() << "'";
- }
-
- m_running = false;
-
- etiLog.level(info) << "ZeroMQ input worker terminated";
-
- subscriber.close();
-
- message_t msg;
- msg.fault = true;
- queue_size = m_in_messages.push(move(msg));
-}
-
-// =======================================
-// Remote Control
-// =======================================
-void InputZeroMQReader::set_parameter(const string& parameter, const string& value)
-{
- stringstream ss(value);
- ss.exceptions ( stringstream::failbit | stringstream::badbit );
-
- if (parameter == "buffer") {
- throw ParameterError("Parameter " + parameter + " is read-only.");
- }
- else {
- stringstream ss_err;
- ss_err << "Parameter '" << parameter
- << "' is not exported by controllable " << get_rc_name();
- throw ParameterError(ss_err.str());
- }
-}
-
-const string InputZeroMQReader::get_parameter(const string& parameter) const
-{
- stringstream ss;
- ss << std::fixed;
- if (parameter == "buffer") {
- // Do not use size of the queue, as it will contain empty
- // frames to signal timeouts
- unique_lock<mutex> lock(m_last_in_messages_size_mutex);
- const long time_in_buffer_us = 24000 * m_last_in_messages_size;
- ss << time_in_buffer_us;
- }
- else {
- ss << "Parameter '" << parameter <<
- "' is not exported by controllable " << get_rc_name();
- throw ParameterError(ss.str());
- }
- return ss.str();
-}
-
-#endif
-
diff --git a/src/Utils.cpp b/src/Utils.cpp
index 350838e..3f378a7 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -90,7 +90,6 @@ void printUsage(const char* progName)
fprintf(out, "Where:\n");
fprintf(out, "input: ETI input filename (default: stdin), or\n");
fprintf(out, " tcp://source:port for ETI-over-TCP input, or\n");
- fprintf(out, " zmq+tcp://source:port for ZMQ input.\n");
fprintf(out, " udp://:port for EDI input.\n");
fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n");
fprintf(out, "-F format: Set the output format (see doc/example.ini for formats) for the file output.\n");