aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-09-05 11:01:56 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-09-05 11:01:56 +0200
commit32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c (patch)
tree59cd09a0eaa6469ba7aed1b9b167cfc8643ec68f
parent0c532ab20b47d661f098cc6089184303dce65d56 (diff)
downloaddabmod-32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c.tar.gz
dabmod-32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c.tar.bz2
dabmod-32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c.zip
Rework EDI input
- Use same main loop for both ETI and EDI inputs - Test SFN functionality with EDI input - Add log.show_process_time setting for process time printout
-rw-r--r--.travis.yml7
-rw-r--r--INSTALL1
-rw-r--r--Makefile.am22
-rw-r--r--configure.ac8
-rw-r--r--doc/example.ini3
-rw-r--r--src/ConfigParser.cpp2
-rw-r--r--src/ConfigParser.h1
-rw-r--r--src/DabMod.cpp480
-rw-r--r--src/DabModulator.cpp2
-rw-r--r--src/EtiReader.cpp14
-rw-r--r--src/EtiReader.h11
-rw-r--r--src/Flowgraph.cpp9
-rw-r--r--src/Flowgraph.h7
-rw-r--r--src/OutputMemory.cpp1
-rw-r--r--src/Utils.cpp3
15 files changed, 274 insertions, 297 deletions
diff --git a/.travis.yml b/.travis.yml
index 990f0a9..ae8c4ef 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -47,13 +47,6 @@ matrix:
compiler: gcc
addons: *linuxaddons
- - env: MATRIX_EVAL="CC=gcc-9 CXX=g++-9" CONF="--disable-output-uhd --enable-edi"
- os: linux
- dist: xenial
- sudo: required
- compiler: gcc
- addons: *linuxaddons
-
- env: MATRIX_EVAL="CC=gcc-9 CXX=g++-9" CONF="--disable-output-uhd --enable-trace"
os: linux
dist: xenial
diff --git a/INSTALL b/INSTALL
index 0132f1c..03ddda1 100644
--- a/INSTALL
+++ b/INSTALL
@@ -32,7 +32,6 @@ The configure script can be launch with a variety of options:
This is meant for distribution package maintainers who want to
use their own march option, and for people running into compilation
issues due to -march=native. (e.g. GCC bug 70132 on ARM systems)
- --enable-edi Enable the EDI input.
Debugging options: You should not enable any debug option if you need good performance.
--enable-trace Create debugging files for each DSP block for data analysis
diff --git a/Makefile.am b/Makefile.am
index 34d2d91..9644c16 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -114,7 +114,16 @@ odr_dabmod_SOURCES = src/DabMod.cpp \
lib/fec/fec.h \
lib/fec/init_rs_char.c \
lib/fec/init_rs.h \
- lib/fec/rs-common.h
+ lib/fec/rs-common.h \
+ lib/edi/buffer_unpack.hpp \
+ lib/edi/common.hpp \
+ lib/edi/common.cpp \
+ lib/edi/eti.hpp \
+ lib/edi/eti.cpp \
+ lib/edi/ETIDecoder.hpp \
+ lib/edi/ETIDecoder.cpp \
+ lib/edi/PFT.hpp \
+ lib/edi/PFT.cpp
if !COMPILE_FOR_EASYDABV3
odr_dabmod_SOURCES += \
@@ -161,14 +170,3 @@ odr_dabmod_SOURCES += \
odr_dabmod_LDADD += $(UHD_LIBS)
endif
-if COMPILE_EDI
-odr_dabmod_SOURCES += lib/edi/buffer_unpack.hpp \
- lib/edi/common.hpp \
- lib/edi/common.cpp \
- lib/edi/eti.hpp \
- lib/edi/eti.cpp \
- lib/edi/ETIDecoder.hpp \
- lib/edi/ETIDecoder.cpp \
- lib/edi/PFT.hpp \
- lib/edi/PFT.cpp
-endif
diff --git a/configure.ac b/configure.ac
index 270a3ff..7e05620 100644
--- a/configure.ac
+++ b/configure.ac
@@ -49,9 +49,6 @@ AC_ARG_ENABLE([trace],
AC_ARG_ENABLE([zeromq],
[AS_HELP_STRING([--disable-zeromq], [Disable ZeroMQ input, output and remote control])],
[], [enable_zeromq=yes])
-AC_ARG_ENABLE([edi],
- [AS_HELP_STRING([--enable-edi], [Enable EDI input])],
- [], [enable_edi=no])
AC_ARG_ENABLE([native],
[AS_HELP_STRING([--disable-native], [Do not compile with -march=native])],
[], [enable_native=yes])
@@ -124,11 +121,6 @@ AS_IF([test "x$enable_output_uhd" = "xyes"],
AS_IF([test "x$enable_soapysdr" = "xyes"],
[AC_DEFINE(HAVE_SOAPYSDR, [1], [Define if SoapySDR output is enabled])])
-AS_IF([test "x$enable_edi" = "xyes"],
- [AC_DEFINE(HAVE_EDI, [1], [Define if EDI input is enabled]) ])
-
-AM_CONDITIONAL([COMPILE_EDI], [test "x$enable_edi" = "xyes"])
-
AS_IF([test "x$enable_easydabv3" = "xyes"],
AC_DEFINE(BUILD_FOR_EASYDABV3, [1], [Define if we are building for EasyDABv3]))
diff --git a/doc/example.ini b/doc/example.ini
index b3e2eb3..72e3386 100644
--- a/doc/example.ini
+++ b/doc/example.ini
@@ -22,6 +22,9 @@ syslog=0
filelog=0
filename=odr-dabmod.log
+; If you don't want to see the flowgraph processing time, set:
+;show_process_time=0
+
[input]
; A file or fifo input is using transport=file
transport=file
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 80103c4..d5d1995 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -144,6 +144,8 @@ static void parse_configfile(
etiLog.register_backend(make_shared<LogTracer>(trace_filename));
}
+ mod_settings.showProcessTime = pt.GetInteger("log.show_process_time",
+ mod_settings.showProcessTime);
// modulator parameters:
const string gainMode_setting = pt.Get("modulator.gainmode", "var");
diff --git a/src/ConfigParser.h b/src/ConfigParser.h
index ee961fa..7e706c0 100644
--- a/src/ConfigParser.h
+++ b/src/ConfigParser.h
@@ -86,6 +86,7 @@ struct mod_settings_t {
Output::SDRDeviceConfig sdr_device_config;
#endif
+ bool showProcessTime = true;
};
void parse_args(int argc, char **argv, mod_settings_t& mod_settings);
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 81882e4..922f9e4 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -59,7 +59,6 @@
#include "OutputZeroMQ.h"
#include "InputReader.h"
#include "PcDebug.h"
-#include "TimestampDecoder.h"
#include "FIRFilter.h"
#include "RemoteControl.h"
#include "ConfigParser.h"
@@ -94,12 +93,16 @@ void signalHandler(int signalNb)
struct modulator_data
{
+ // For ETI
std::shared_ptr<InputReader> inputReader;
- Buffer data;
- uint64_t framecount = 0;
+ std::shared_ptr<EtiReader> etiReader;
+
+ // For EDI
+ std::shared_ptr<EdiInput> ediInput;
- Flowgraph* flowgraph = nullptr;
- EtiReader* etiReader = nullptr;
+ // Common to both EDI and EDI
+ uint64_t framecount = 0;
+ Flowgraph *flowgraph = nullptr;
};
enum class run_modulator_state_t {
@@ -299,205 +302,137 @@ int launch_modulator(int argc, char* argv[])
etiLog.level(error) << "Could not set priority for modulator:" << r;
}
+ shared_ptr<InputReader> inputReader;
+ shared_ptr<EdiInput> ediInput;
+
if (mod_settings.inputTransport == "edi") {
-#ifdef HAVE_EDI
- EdiReader ediReader(mod_settings.tist_offset_s);
- EdiDecoder::ETIDecoder ediInput(ediReader, false);
- if (mod_settings.edi_max_delay_ms > 0.0f) {
- // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames
- ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f));
- }
- EdiTransport ediTransport(ediInput);
+ ediInput = make_shared<EdiInput>(mod_settings.tist_offset_s, mod_settings.edi_max_delay_ms);
- ediTransport.Open(mod_settings.inputName);
- if (not ediTransport.isEnabled()) {
+ ediInput->ediTransport.Open(mod_settings.inputName);
+ if (not ediInput->ediTransport.isEnabled()) {
throw runtime_error("inputTransport is edi, but ediTransport is not enabled");
}
- Flowgraph flowgraph;
-
- auto modulator = make_shared<DabModulator>(ediReader, mod_settings);
- rcs.enrol(modulator.get());
+ }
+ else if (mod_settings.inputTransport == "file") {
+ auto inputFileReader = make_shared<InputFileReader>();
- if (format_converter) {
- flowgraph.connect(modulator, format_converter);
- flowgraph.connect(format_converter, output);
- }
- else {
- flowgraph.connect(modulator, output);
+ // Opening ETI input file
+ if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) {
+ throw std::runtime_error("Unable to open input");
}
- size_t framecount = 0;
-
- bool first_frame = true;
-
- auto frame_received_tp = chrono::steady_clock::now();
-
- while (running) {
- while (running and not ediReader.isFrameReady()) {
- try {
- bool packet_received = ediTransport.rxPacket();
- if (packet_received) {
- frame_received_tp = chrono::steady_clock::now();
- }
- }
- catch (const std::runtime_error& e) {
- etiLog.level(warn) << "EDI input: " << e.what();
- running = 0;
- break;
- }
-
- if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) {
- etiLog.level(error) << "No EDI data received in 10 seconds.";
- running = 0;
- break;
- }
- }
-
- if (not running) {
- break;
- }
-
- if (first_frame) {
- if (ediReader.getFp() != 0) {
- // Do not start the flowgraph before we get to FP 0
- // to ensure all blocks are properly aligned.
- ediReader.clearFrame();
- continue;
- }
- else {
- first_frame = false;
- }
- }
-
- framecount++;
- flowgraph.run();
- ediReader.clearFrame();
-
- /* Check every once in a while if the remote control
- * is still working */
- if ((framecount % 250) == 0) {
- rcs.check_faults();
- }
- }
-#else
+ inputReader = inputFileReader;
+ }
+ else if (mod_settings.inputTransport == "zeromq") {
+#if !defined(HAVE_ZEROMQ)
throw std::runtime_error("Unable to open input: "
- "EDI input transport selected, but not compiled in!");
-#endif // HAVE_EDI
+ "ZeroMQ input transport selected, but not compiled in!");
+#else
+ auto inputZeroMQReader = make_shared<InputZeroMQReader>();
+ inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
+ rcs.enrol(inputZeroMQReader.get());
+ inputReader = inputZeroMQReader;
+#endif
+ }
+ else if (mod_settings.inputTransport == "tcp") {
+ auto inputTcpReader = make_shared<InputTcpReader>();
+ inputTcpReader->Open(mod_settings.inputName);
+ inputReader = inputTcpReader;
}
else {
- shared_ptr<InputReader> inputReader;
+ throw std::runtime_error("Unable to open input: "
+ "invalid input transport " + mod_settings.inputTransport + " selected!");
+ }
- if (mod_settings.inputTransport == "file") {
- auto inputFileReader = make_shared<InputFileReader>();
+ bool run_again = true;
- // Opening ETI input file
- if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) {
- throw std::runtime_error("Unable to open input");
- }
+ while (run_again) {
+ Flowgraph flowgraph(mod_settings.showProcessTime);
- inputReader = inputFileReader;
+ modulator_data m;
+ m.ediInput = ediInput;
+ m.inputReader = inputReader;
+ m.flowgraph = &flowgraph;
+
+ shared_ptr<DabModulator> modulator;
+ if (inputReader) {
+ m.etiReader = make_shared<EtiReader>(mod_settings.tist_offset_s);
+ modulator = make_shared<DabModulator>(*m.etiReader, mod_settings);
}
- else if (mod_settings.inputTransport == "zeromq") {
-#if !defined(HAVE_ZEROMQ)
- throw std::runtime_error("Unable to open input: "
- "ZeroMQ input transport selected, but not compiled in!");
-#else
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
-#endif
+ else if (ediInput) {
+ modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings);
}
- else if (mod_settings.inputTransport == "tcp") {
- auto inputTcpReader = make_shared<InputTcpReader>();
- inputTcpReader->Open(mod_settings.inputName);
- inputReader = inputTcpReader;
+
+ rcs.enrol(modulator.get());
+
+ if (format_converter) {
+ flowgraph.connect(modulator, format_converter);
+ flowgraph.connect(format_converter, output);
}
else {
- throw std::runtime_error("Unable to open input: "
- "invalid input transport " + mod_settings.inputTransport + " selected!");
+ flowgraph.connect(modulator, output);
}
- bool run_again = true;
-
- while (run_again) {
- Flowgraph flowgraph;
-
- modulator_data m;
- m.inputReader = inputReader;
- m.flowgraph = &flowgraph;
- m.data.setLength(6144);
-
- EtiReader etiReader(mod_settings.tist_offset_s);
- m.etiReader = &etiReader;
-
- auto input = make_shared<InputMemory>(&m.data);
- auto modulator = make_shared<DabModulator>(etiReader, mod_settings);
- rcs.enrol(modulator.get());
-
- if (format_converter) {
- flowgraph.connect(modulator, format_converter);
- flowgraph.connect(format_converter, output);
- }
- else {
- flowgraph.connect(modulator, output);
- }
-
+ if (inputReader) {
etiLog.level(info) << inputReader->GetPrintableInfo();
+ }
- run_modulator_state_t st = run_modulator(m);
- etiLog.log(trace, "DABMOD,run_modulator() = %d", st);
-
- switch (st) {
- case run_modulator_state_t::failure:
- etiLog.level(error) << "Modulator failure.";
- run_again = false;
- ret = 1;
- break;
- case run_modulator_state_t::again:
- etiLog.level(warn) << "Restart modulator.";
- run_again = false;
- if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) {
- if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) {
- etiLog.level(error) << "Unable to open input file!";
- ret = 1;
- }
- else {
- run_again = true;
- }
+ run_modulator_state_t st = run_modulator(m);
+ etiLog.log(trace, "DABMOD,run_modulator() = %d", st);
+
+ switch (st) {
+ case run_modulator_state_t::failure:
+ etiLog.level(error) << "Modulator failure.";
+ run_again = false;
+ ret = 1;
+ break;
+ case run_modulator_state_t::again:
+ etiLog.level(warn) << "Restart modulator.";
+ run_again = false;
+ if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) {
+ if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) {
+ etiLog.level(error) << "Unable to open input file!";
+ ret = 1;
}
-#if defined(HAVE_ZEROMQ)
- else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
+ else {
run_again = true;
- // Create a new input reader
- rcs.remove_controllable(in_zmq.get());
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
}
+ }
+#if defined(HAVE_ZEROMQ)
+ else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
+ run_again = true;
+ // Create a new input reader
+ rcs.remove_controllable(in_zmq.get());
+ auto inputZeroMQReader = make_shared<InputZeroMQReader>();
+ inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
+ rcs.enrol(inputZeroMQReader.get());
+ inputReader = inputZeroMQReader;
+ }
#endif
- else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) {
- // Keep the same inputReader, as there is no input buffer overflow
- run_again = true;
- }
- break;
- case run_modulator_state_t::reconfigure:
- etiLog.level(warn) << "Detected change in ensemble configuration.";
- /* We can keep the input in this care */
+ else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) {
+ // Keep the same inputReader, as there is no input buffer overflow
run_again = true;
- break;
- case run_modulator_state_t::normal_end:
- default:
- etiLog.level(info) << "modulator stopped.";
- ret = 0;
- run_again = false;
- break;
- }
-
- etiLog.level(info) << m.framecount << " DAB frames encoded";
- etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
+ }
+ else if (ediInput) {
+ // In EDI, keep the same input
+ run_again = true;
+ }
+ break;
+ case run_modulator_state_t::reconfigure:
+ etiLog.level(warn) << "Detected change in ensemble configuration.";
+ /* We can keep the input in this case */
+ run_again = true;
+ break;
+ case run_modulator_state_t::normal_end:
+ default:
+ etiLog.level(info) << "modulator stopped.";
+ ret = 0;
+ run_again = false;
+ break;
}
+
+ etiLog.level(info) << m.framecount << " DAB frames encoded";
+ etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
}
etiLog.level(info) << "Terminating";
@@ -516,57 +451,142 @@ static run_modulator_state_t run_modulator(modulator_data& m)
{
auto ret = run_modulator_state_t::failure;
try {
- bool first_frame = true;
int last_eti_fct = -1;
auto last_frame_received = chrono::steady_clock::now();
+ Buffer data;
+ if (m.inputReader) {
+ data.setLength(6144);
+ }
while (running) {
- int framesize;
-
- PDEBUG("*****************************************\n");
- PDEBUG("* Starting main loop\n");
- PDEBUG("*****************************************\n");
- while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) {
- if (!running) {
- break;
- }
+ while (true) {
+ unsigned fct = 0;
+ unsigned fp = 0;
+
+ /* Load ETI data from the source */
+ if (m.inputReader) {
+ int framesize = m.inputReader->GetNextFrame(data.getData());
+
+ if (framesize == 0) {
+ if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) {
+ etiLog.level(info) << "End of file reached.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
+ break;
+ }
+#if defined(HAVE_ZEROMQ)
+ else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
+ /* An empty frame marks a timeout. We ignore it, but we are
+ * now able to handle SIGINT properly.
+ *
+ * Also, we reconnect zmq every 10 seconds to avoid some
+ * issues, discussed in
+ * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
+ *
+ * > It is possible that the PUB socket sees the error
+ * > while the SUB socket does not.
+ * >
+ * > The ZMTP RFC has a proposal for heartbeating that would
+ * > solve this problem. The current best solution is for
+ * > PUB sockets to send heartbeats (e.g. 1 per second) when
+ * > traffic is low, and for SUB sockets to disconnect /
+ * > reconnect if they stop getting these.
+ *
+ * We don't need a heartbeat, because our application is constant frame rate,
+ * the frames themselves can act as heartbeats.
+ */
+
+ const auto now = chrono::steady_clock::now();
+ if (last_frame_received + chrono::seconds(10) < now) {
+ throw zmq_input_timeout();
+ }
+ }
+#endif // defined(HAVE_ZEROMQ)
+ else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
+ /* Same as for ZeroMQ */
+ }
+ else {
+ throw logic_error("Unhandled framesize==0!");
+ }
+ }
+ else if (framesize < 0) {
+ etiLog.level(error) << "Input read error.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
+ break;
+ }
+
+ const int eti_bytes_read = m.etiReader->loadEtiData(data);
+ if ((size_t)eti_bytes_read != data.getLength()) {
+ etiLog.level(error) << "ETI frame incompletely read";
+ throw std::runtime_error("ETI read error");
+ }
- last_frame_received = chrono::steady_clock::now();
+ fct = m.etiReader->getFct();
+ fp = m.etiReader->getFp();
+ }
+ else if (m.ediInput) {
+ while (running and not m.ediInput->ediReader.isFrameReady()) {
+ try {
+ bool packet_received = m.ediInput->ediTransport.rxPacket();
+ if (packet_received) {
+ last_frame_received = chrono::steady_clock::now();
+ }
+ }
+ catch (const std::runtime_error& e) {
+ etiLog.level(warn) << "EDI input: " << e.what();
+ running = 0;
+ break;
+ }
- m.framecount++;
+ if (last_frame_received + chrono::seconds(10) < chrono::steady_clock::now()) {
+ etiLog.level(error) << "No EDI data received in 10 seconds.";
+ running = 0;
+ break;
+ }
+ }
- PDEBUG("*****************************************\n");
- PDEBUG("* Read frame %lu\n", m.framecount);
- PDEBUG("*****************************************\n");
+ if (!running) {
+ break;
+ }
- const int eti_bytes_read = m.etiReader->loadEtiData(m.data);
- if ((size_t)eti_bytes_read != m.data.getLength()) {
- etiLog.level(error) << "ETI frame incompletely read";
- throw std::runtime_error("ETI read error");
+ fct = m.ediInput->ediReader.getFct();
+ fp = m.ediInput->ediReader.getFp();
}
- if (first_frame) {
- if (m.etiReader->getFp() != 0) {
+ const unsigned expected_fct = (last_eti_fct + 1) % 250;
+ if (last_eti_fct == -1) {
+ if (fp != 0) {
// Do not start the flowgraph before we get to FP 0
// to ensure all blocks are properly aligned.
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
continue;
}
else {
- first_frame = false;
+ last_eti_fct = fct;
+ m.framecount++;
+ m.flowgraph->run();
}
}
-
- // Check for ETI FCT continuity
- const unsigned expected_fct = (last_eti_fct + 1) % 250;
- const unsigned fct = m.etiReader->getFct();
- if (last_eti_fct != -1 and expected_fct != fct) {
+ else if (fct == expected_fct) {
+ last_eti_fct = fct;
+ m.framecount++;
+ m.flowgraph->run();
+ }
+ else {
etiLog.level(info) << "ETI FCT discontinuity, expected " <<
- expected_fct << " received " << m.etiReader->getFct();
+ expected_fct << " received " << fct;
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
return run_modulator_state_t::again;
}
- last_eti_fct = fct;
- m.flowgraph->run();
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
/* Check every once in a while if the remote control
* is still working */
@@ -574,52 +594,6 @@ static run_modulator_state_t run_modulator(modulator_data& m)
rcs.check_faults();
}
}
- if (framesize == 0) {
- if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) {
- etiLog.level(info) << "End of file reached.";
- running = 0;
- ret = run_modulator_state_t::normal_end;
- }
-#if defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
- /* An empty frame marks a timeout. We ignore it, but we are
- * now able to handle SIGINT properly.
- *
- * Also, we reconnect zmq every 10 seconds to avoid some
- * issues, discussed in
- * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
- *
- * > It is possible that the PUB socket sees the error
- * > while the SUB socket does not.
- * >
- * > The ZMTP RFC has a proposal for heartbeating that would
- * > solve this problem. The current best solution is for
- * > PUB sockets to send heartbeats (e.g. 1 per second) when
- * > traffic is low, and for SUB sockets to disconnect /
- * > reconnect if they stop getting these.
- *
- * We don't need a heartbeat, because our application is constant frame rate,
- * the frames themselves can act as heartbeats.
- */
-
- const auto now = chrono::steady_clock::now();
- if (last_frame_received + chrono::seconds(10) < now) {
- throw zmq_input_timeout();
- }
- }
-#endif // defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
- /* Same as for ZeroMQ */
- }
- else {
- throw logic_error("Unhandled framesize==0!");
- }
- }
- else {
- etiLog.level(error) << "Input read error.";
- running = 0;
- ret = run_modulator_state_t::normal_end;
- }
}
}
catch (const zmq_input_timeout&) {
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 7e3ccf0..aa4f2a8 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -132,7 +132,7 @@ int DabModulator::process(Buffer* dataOut)
const unsigned mode = m_settings.dabMode;
setMode(mode);
- myFlowgraph = make_shared<Flowgraph>();
+ myFlowgraph = make_shared<Flowgraph>(m_settings.showProcessTime);
////////////////////////////////////////////////////////////////
// CIF data initialisation
////////////////////////////////////////////////////////////////
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index 719966b..25c1ada 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -312,7 +312,6 @@ uint32_t EtiReader::getPPSOffset()
return timestamp;
}
-#ifdef HAVE_EDI
EdiReader::EdiReader(
double& tist_offset_s) :
m_timestamp_decoder(tist_offset_s)
@@ -654,4 +653,15 @@ bool EdiTransport::rxPacket()
}
throw logic_error("Incomplete rxPacket implementation!");
}
-#endif // HAVE_EDI
+
+EdiInput::EdiInput(double& tist_offset_s, float edi_max_delay_ms) :
+ ediReader(tist_offset_s),
+ decoder(ediReader, false),
+ ediTransport(decoder)
+{
+ if (edi_max_delay_ms > 0.0f) {
+ // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames
+ decoder.setMaxDelay(lroundf(edi_max_delay_ms / 24.0f));
+ }
+}
+
diff --git a/src/EtiReader.h b/src/EtiReader.h
index 99ca715..28fb2ac 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -111,7 +111,6 @@ private:
std::vector<std::shared_ptr<SubchannelSource> > mySources;
};
-#ifdef HAVE_EDI
/* The EdiReader extracts the necessary data using the EDI input library in
* lib/edi
*/
@@ -213,5 +212,13 @@ class EdiTransport {
Socket::TCPClient m_tcpclient;
EdiDecoder::ETIDecoder& m_decoder;
};
-#endif
+
+// EdiInput wraps an EdiReader, an EdiDecoder::ETIDecoder and an EdiTransport
+class EdiInput {
+ public:
+ EdiInput(double& tist_offset_s, float edi_max_delay_ms);
+ EdiReader ediReader;
+ EdiDecoder::ETIDecoder decoder;
+ EdiTransport ediTransport;
+};
diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp
index 4c83fe8..3d4cdcc 100644
--- a/src/Flowgraph.cpp
+++ b/src/Flowgraph.cpp
@@ -43,8 +43,7 @@ using EdgeIterator = std::vector<shared_ptr<Edge> >::iterator;
Node::Node(shared_ptr<ModPlugin> plugin) :
- myPlugin(plugin),
- myProcessTime(0)
+ myPlugin(plugin)
{
PDEBUG("Node::Node(plugin(%s): %p) @ %p\n",
plugin->name(), plugin.get(), this);
@@ -237,8 +236,8 @@ Edge::~Edge()
-Flowgraph::Flowgraph() :
- myProcessTime(0)
+Flowgraph::Flowgraph(bool showProcessTime) :
+ myShowProcessTime(showProcessTime)
{
PDEBUG("Flowgraph::Flowgraph() @ %p\n", this);
}
@@ -248,7 +247,7 @@ Flowgraph::~Flowgraph()
{
PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this);
- if (myProcessTime) {
+ if (myShowProcessTime and myProcessTime) {
stringstream ss;
ss << "Process time:\n";
diff --git a/src/Flowgraph.h b/src/Flowgraph.h
index 389359b..753070b 100644
--- a/src/Flowgraph.h
+++ b/src/Flowgraph.h
@@ -71,7 +71,7 @@ protected:
#endif
std::shared_ptr<ModPlugin> myPlugin;
- time_t myProcessTime;
+ time_t myProcessTime = 0;
};
@@ -94,7 +94,7 @@ protected:
class Flowgraph
{
public:
- Flowgraph();
+ Flowgraph(bool showProcessTime);
virtual ~Flowgraph();
Flowgraph(const Flowgraph&) = delete;
Flowgraph& operator=(const Flowgraph&) = delete;
@@ -106,7 +106,8 @@ public:
protected:
std::vector<std::shared_ptr<Node> > nodes;
std::vector<std::shared_ptr<Edge> > edges;
- time_t myProcessTime;
+ time_t myProcessTime = 0;
+ bool myShowProcessTime;
};
diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp
index 5f24095..d6ef917 100644
--- a/src/OutputMemory.cpp
+++ b/src/OutputMemory.cpp
@@ -27,6 +27,7 @@
#include "OutputMemory.h"
#include "PcDebug.h"
#include "Log.h"
+#include "TimestampDecoder.h"
#include <stdexcept>
#include <string.h>
diff --git a/src/Utils.cpp b/src/Utils.cpp
index 50af4fb..6f4f3a3 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -46,9 +46,6 @@ static void printHeader()
#if defined(HAVE_ZEROMQ)
"zeromq " <<
#endif
-#ifdef HAVE_EDI
- "EDI " <<
-#endif
#if defined(HAVE_OUTPUT_UHD)
"output_uhd " <<
#endif